RE: Could not stop job with a savepoint

2022-03-07 Thread Schwalbe Matthias
Bom Dia Vinicius,

Can You still find (and post) the exception stack from your jobmanager log, the 
flink client log does not reveal enough information.
Your situation reminds me of something similar I had.
In the log you might find something like this or similar:

2022-03-07 02:15:41,347 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Triggering stop-with-savepoint for job 
e12f22653f79194863ab426312dd666a.
2022-03-07 02:15:41,380 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 4983974 (type=SAVEPOINT_SUSPEND) @ 1646615741347 for job 
e12f22653f79194863ab426312dd666a.
2022-03-07 02:15:43,042 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline 
checkpoint 4983974 by task 0e659ac720e3e0b3e4072dbc1cc85cd3 of job 
e12f22653f79194863ab426312dd666a at 
container_e1093_1646358077201_0002_01_01 @ ulxxphaddtn02.adgr.net 
(dataPort=44767).
org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]

BTW what Flink version are you running?
What is EMR (what technology underneath).



From: Vinicius Peracini 
Sent: Montag, 7. März 2022 20:46
To: Dawid Wysakowicz 
Cc: user@flink.apache.org
Subject: Re: Could not stop job with a savepoint

Hi Dawid, thanks for the reply.

The job was still in progress and producing events. Unfortunately I was not 
able to stop the job with a savepoint or to just create a savepoint. I had to 
stop the job without the savepoint and restore the state using the last 
checkpoint. Still reviewing my configuration and trying to figure out why this 
is happening. Any help would be appreciated.

Thanks!


On Mon, Mar 7, 2022 at 11:56 AM Dawid Wysakowicz 
mailto:dwysakow...@apache.org>> wrote:

Hi,

From the exception it seems the job has been already done when you're 
triggering the savepoint.

Best,

Dawid
On 07/03/2022 14:56, Vinicius Peracini wrote:
Hello everyone,

I have a Flink job (version 1.14.0 running on EMR) and I'm having this issue 
while trying to stop a job with a savepoint on S3:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"df3a3c590fabac737a17f1160c21094c".
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)
... 9 more

I'm using incremental and unaligned checkpoints (aligned checkpoint timeout is 
30 seconds). I also tried to create the savepoint without stopping the job 
(using flink savepoint command) and got the same error. Any idea what is 
happening here?

Thanks in advance,

Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a receber 
este documento, não deve usar, copiar ou divulgar as informações nele contidas 
ou tomar qualquer ação baseada nessas informações.

Disclaimer: The information contained in this document may be privileged and 
confidential and protected from disclosure. If the reader of this document is 
not the intended recipient, or an employee agent responsible for delivering 
this document to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited.

Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a receber 
este documento, não deve usar, copiar ou divulgar as informações nele contidas 
ou tomar qualquer ação baseada nessas informações.

Disclaimer: The information contained in this document may be privileged and 
confidential and protected from disclosure. If the reader of this document is 
not the intended recipient, or an emp

Using another FileSystem configuration while creating a job

2022-03-07 Thread Gil De Grove
Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David Anderson
suggested me to reach out to the community via the mailing list. The link
to the SO question is the following:
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help, please let
me know;

Summary

We are currently facing an issue with the FileSystem abstraction in Flink.
We have a job that can dynamically connect to an S3 source (meaning it's
defined at runtime). We discovered a bug in our code, and it could be due
to a wrong assumption on the way the FileSystem works.
Bug explanation

During the initialization of the job, (so in the job manager) we manipulate
the FS to check that some files exist in order to fail gracefully before
the job is executed. In our case, we need to set dynamically the FS. It can
be either HDFS, S3 on AWS or S3 on MinIO. We want the FS configuration to
be specific for the job, and different from the cluster one (different
access key, different endpoint, etc.).

Here is an extract of the code we are using to do so:

  private void validateFileSystemAccess(Configuration configuration)
throws IOException {
// Create a plugin manager from the configuration
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);

// Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager);

// Validate the FileSystem: an exception is thrown if FS
configuration is wrong
Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/"));
  }

After starting that specific kind of job, we notice that:

   1. the checkpointing does not work for this job, it throws a credential
   error.
   2. the job manager cannot upload the artifacts needed by the history
   server for all jobs already running of all kind (not only this specific
   kind of job).

If we do not deploy that kind of job, the upload of artifacts and the
checkpointing work as expected on the cluster.

We think that this issue might come from the FileSystem.initialize() that
overrides the configuration for all the FileSystems. We think that because
of this, the next call to FileSystem.get() returns the FileSystem we
configured in validateFileSystemAccess instead of the cluster configured
one.
Questions

Could our hypothesis be correct? If so, how could we provide a specific
configuration for the FileSystem without impacting the whole cluster?
Regards,
Gil


RE: MapState.entries()

2022-03-07 Thread Schwalbe Matthias
Hi Alexey,

To my best knowledge it's lazy with RocksDBStateBackend, using the Java 
iterator you could even modify the map (e.g. remove()).

Cheers

Thias


From: Alexey Trenikhun 
Sent: Dienstag, 8. März 2022 06:11
To: Flink User Mail List 
Subject: MapState.entries()

Hello,
We are using RocksDBStateBackend, is MapState.entries() call in this case 
"lazy" - deserializes single entry while next(), or MapState.entries() returns 
collection, which is fully loaded into memory?


Thanks,
Alexey
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Flink Statefun Kafka Ingress Record Key Deserializer

2022-03-07 Thread Xin Li
*Hello Flink Team,*

I am right now using Flink stateful function in my project, which are
consuming avro serialized events(both key and value are serialized) from
kafka, but it seems there is no configuration that users can customize for
deserializing the kafka record's key, because I noticed that the key
deserializer is fixed to be a UTF-8 String Deserializer here:
RoutableKafkaIngressDeserializer.java

.

As a result, the deserialized key becomes chaos code, then incorrect hash
values will be generated based on these chaos codes, which leads to highly
possibly uneven record distribution and is prone to cause data skew.

I wonder if the community will consider adding a configuration for users to
customize the deserializer in the Flink stateful function kafka ingress ?

Looking forward to hearing from you

Best regards

*Xin Li*


Flink Checkpoint Timeout

2022-03-07 Thread Mahantesh Patil
Hello Team,

What happens after checkpoint timeout?

Does Flink reprocess data from the previous checkpoint for all tasks?

I have one compute intensive operator with parallelism of 20 and only one
of the parallel tasks seems to get stuck because of data skew. On
checkpoint timeout , will data be reprocessed or continue processing new
data? If not, will increasing checkpoint timeout help.

Checkpoint Configuration:

CheckpointingMode.EXACTLY_ONCE;

CheckPointTimeOut 10 min;

MinPauseBetweenCheckpoints 30 sec;

CheckPointingInterval 30 sec;


Thanks,
Mahantesh


Re: Flatmap operator in an Asynchronous call

2022-03-07 Thread Gen Luo
Hi Diwakar,

An asynchronous flatmap function without the support of the framework can
be problematic. You should not call collector.collect outside the main
thread of the task, i.e. outside the flatMap method.

I'd suggest using a customized Source instead to process the files, which
uses a SplitEnumerator to discover the files and SourceReaders to read the
files. In this way checkpoints can be triggered between two calls of
pollNext, so you don't have to implement it asynchronously. It would be
better if the readers read the lines and the records are enriched in a map
function following.



On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha  wrote:

> Hello Everyone,
>
> I'm running a streaming application using Flink 1.11 and EMR 6.01. My use
> case is reading files from a s3 bucket, filter file contents ( say record)
> and enrich each record. Filter records and output to a sink.
> I'm reading 6k files per 15mints and the total number of records is 3
> billion/15mints. I'm using a flat map operator to convert the file into
> records and enrich records in a synchronous call.
>
> *Problem* : My application fails (Checkpoint timeout) to run if i add
> more filter criteria(operator). I suspect the system is not able to scale
> (CPU util as still 20%) because of the synchronous call. I want to convert
> this flat map to an asynchronous call using AsyncFunction. I was looking
> for something like an AsyncCollector.collect
> 
>
> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
> to complement my current synchronous implementation using flatmap but it
> seems like this is not available in Flink 1.11.
>
> *Question* :
> Could someone please help me with converting this flatmap operation to an
> asynchronous call?
>
> Please let me know if you have any questions.
>
> Best,
>


MapState.entries()

2022-03-07 Thread Alexey Trenikhun
Hello,
We are using RocksDBStateBackend, is MapState.entries() call in this case 
"lazy" - deserializes single entry while next(), or MapState.entries() returns 
collection, which is fully loaded into memory?

Thanks,
Alexey


Re: [External] Require help regarding possible issue/bug I'm facing while using Flink

2022-03-07 Thread Qingsheng Ren
Hi De Xun, 

Unfortunately MAP, ARRAY and ROW types are supported by Flink Parquet format 
only since Flink 1.15 (see FLINK-17782 [1], not released yet). You may want to 
upgrade Flink version to 1.15 once it is released, or make your own 
implementation based on the latest code on master branch for now.

Also answered on your StackOverflow post. Hope this would be helpful!

[1] https://issues.apache.org/jira/browse/FLINK-17782

Best Regards, 

Qingsheng

> On Mar 7, 2022, at 18:48, De Xun Chia  wrote:
> 
> Hi Qingsheng Ren,
> 
> 
> Thank you for the help! It worked and I have updated the StackOverflow post 
> with the correct source code answer so other people can use it.
> 
> 
> I am now faced with another issue though - I have created a new StackOverflow 
> post
>  for it. If it about writing out complex parquet data types. If you are able 
> to examine it please do, otherwise help me direct it to another dev who has 
> knowledge about it. 
> 
> 
> Have a great week ahead!
> 
> 
> Best Regards,
> 
> De Xun
> 
> 
> On Mon, Mar 7, 2022, 3:19 PM  wrote:
> Hi De Xun, 
> 
> I created an answer in the StackOverflow and hope it would be helpful. I’d 
> like repost my answer here for the convenience of people in mailing lists.
> 
> The first call of RowRowConverter::toInternal is an internal implementation 
> for making a deep copy of the StreamRecord emitted by table source, which is 
> independent from the converter in your map function. 
> 
> The reason of the NPE is that the RowRowConverter in the map function is not 
> initialized by calling RowRowConverter::open. You can use RichMapFunction 
> instead to invoke the RowRowConverter::open in RichMapFunction::open.
> 
> Best regards,
> 
> Qingsheng Ren
> 
> > On Mar 7, 2022, at 09:16, Chia De Xun .  wrote:
> > 
> > Greetings,
> > 
> > I'm facing a difficult issue/bug while working with Flink. Would definitely 
> > appreciate some official expert help on this issue. I have posted my 
> > problem on StackOverflow, but have no replies at the moment. 
> > 
> > Let me know if you have any questions/clarifications for me! It would be 
> > best appreciated.
> > 
> > Best Regards,
> > De Xun
> 



Re: Question about Flink counters

2022-03-07 Thread Shane Bishop
Hi Dawid,

My team's Flink application's primary purpose is not to count the number of SQS 
messages received or the number of successful or failed S3 downloads. The 
application's primary purpose is to process events and the corresponding data, 
and for each event, create or update a new entry in our database with the 
result of this processing. (I can't really go into too much detail on what is 
processed and what results are produced because that is proprietary 
information.)

As you suggest, the counters for SQS messages and successful or unsuccessful 
downloads from S3 are just for the purposes of monitoring. We use these metrics 
to have an idea of how healthy our application is, and to help indicate which 
components may have faults. We are not using these counters to calculate 
results.

My team is trying to understand why we see inaccurate values for our metrics, 
with the intention of fixing the inaccuracies so we can better monitor our 
application.

I hope this helps to clarify the context of my inquiry.

Best,
Shane


Re: Incremental checkpointing & RocksDB Serialization

2022-03-07 Thread Vidya Sagar Mula
Hi Yun,

Thank you for the response.



   1. You could tune your job to avoid backpressure. Maybe you can upgrade
   your flink engine to at least flink-1.13 to know how to monitor the back
   pressure status [1].

[VIDYA] - In the view of my organization, it's a very big activity to
upgrade to Flink version from our current one(1.11). I need to continue for
my dev activity with 1.11 only.

   1. You can refer to [2] to know how to custom your serializer.

[VIDYA] - Thanks for providing me with the link references for custom
serializer. I am wondering, how is the serialization part in the
incremental checkpointing is different from Full checkpointing. My pipeline
logic is same for both Full checkpoint and Incremental checkpoint, except
the checkpoint.type variable change and some other env variables. But, the
code pipeline logic should be same for both types of checkpoints.

- Full checkpoint of pipeline is not taking considerably long time when
compared to incremental checkpointing at the end of the window. I see the
backpressure is High and CPU utilization is high with incremental
checkpointing. Thread dump shows the stack related to serialization. How is
the serialization part different between full checkpointing vs Incremental
checkpointing? I know, RocksDB library has some serializers for Incremental.

- While I am not writing custom serializer for my pipeline in case of Full
checkpointing, is it the general pattern to implement custom serializer in
case of Incremental?

- With respect with serializers for Full vs Incremental checkpointing,
What's the general usage pattern across the Flink community? If I write
custom serializer for Incremental, how does it go with Full checkpointing.

Please clarify.

Thanks,
Vidya.




[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

On Sun, Mar 6, 2022 at 12:11 AM Yun Tang  wrote:

> Hi Vidya,
>
>
>1. You could tune your job to avoid backpressure. Maybe you can
>upgrade your flink engine to at least flink-1.13 to know how to monitor the
>back pressure status [1]
>2. You can refer to [2] to know how to custom your serializer.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/
>
> Best,
> Yun Tang
> --
> *From:* Vidya Sagar Mula 
> *Sent:* Sunday, March 6, 2022 4:16
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: Incremental checkpointing & RocksDB Serialization
>
> Hi Yun Tang,
> Thank you for the reply. I have follow up questions and need some more
> details. Can you please clarify my inline questions?
>
> > Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
>
> I guess that this is because the job is under back pressure on end of
> window. You can expand the checkpoint details to see whether that the async
> duration of each task is much slower than the e2e duration? If so, this
> caused the checkpoint barrier stay in the channel longer.
>
> * - Yes, I expanded the checkpoint details and noticed e2e duration
> is much higher than async duration. Attaching the screenshot
> here(Checkpoint #59) Can you give elaborate more on "checkpoint barrier
> stay in the channel longer." What are the suggested ways to mitigate this
> issue? I am wondering how can this be avoided as it is happening only at
> the end of the window.*
>
>
> > Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> From our experience,  kryo is not a good choice in most cases.
>
>
> * - What are your recommendations on other serializers? I tried to
> change it to Avro by enabling the flag "forceAvro" to TRUE in the Execution
> Config. But, it RocksDB is still going picking KryoSerializer. This is
> because the Transformation is KeyType is assigned as GenericType. I am not
> sure what changes need to made to my class/pojo to take the Avro
> Serialzer.  Can you please suggest the way to change to other better
> serializers?*
>
>
>
> On Fri, Mar 4, 2022 at 2:06 AM Yun Tang  wrote:
>
> Hi Vidya,
>
> > Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
>
> I guess that this is because the job is under back pressure on end of
> window. You can expand the checkpoint details to see whether that the async
> duration of each task is much slower than the e2e duration? If so, this
> caused the checkpoint barrier stay in the channel longer.
>
> > Why is RocksDB serialization causing the CPU peak?
>
> This is caused by the implementation of your serializer.
>
> > Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> From our experience,  kryo is not 

Flatmap operator in an Asynchronous call

2022-03-07 Thread Diwakar Jha
Hello Everyone,

I'm running a streaming application using Flink 1.11 and EMR 6.01. My use
case is reading files from a s3 bucket, filter file contents ( say record)
and enrich each record. Filter records and output to a sink.
I'm reading 6k files per 15mints and the total number of records is 3
billion/15mints. I'm using a flat map operator to convert the file into
records and enrich records in a synchronous call.

*Problem* : My application fails (Checkpoint timeout) to run if i add more
filter criteria(operator). I suspect the system is not able to scale (CPU
util as still 20%) because of the synchronous call. I want to convert this
flat map to an asynchronous call using AsyncFunction. I was looking for
something like an AsyncCollector.collect


https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
to complement my current synchronous implementation using flatmap but it
seems like this is not available in Flink 1.11.

*Question* :
Could someone please help me with converting this flatmap operation to an
asynchronous call?

Please let me know if you have any questions.

Best,


Re: Could not stop job with a savepoint

2022-03-07 Thread Vinicius Peracini
Hi Dawid, thanks for the reply.

The job was still in progress and producing events. Unfortunately I was not
able to stop the job with a savepoint or to just create a savepoint. I had
to stop the job without the savepoint and restore the state using the last
checkpoint. Still reviewing my configuration and trying to figure out why
this is happening. Any help would be appreciated.

Thanks!


On Mon, Mar 7, 2022 at 11:56 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> From the exception it seems the job has been already done when you're
> triggering the savepoint.
>
> Best,
>
> Dawid
> On 07/03/2022 14:56, Vinicius Peracini wrote:
>
> Hello everyone,
>
> I have a Flink job (version 1.14.0 running on EMR) and I'm having this
> issue while trying to stop a job with a savepoint on S3:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "df3a3c590fabac737a17f1160c21094c".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.util.concurrent.ExecutionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> Coordinator is suspending.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)
> ... 9 more
>
> I'm using incremental and unaligned checkpoints (aligned checkpoint
> timeout is 30 seconds). I also tried to create the savepoint without
> stopping the job (using flink savepoint command) and got the same error.
> Any idea what is happening here?
>
> Thanks in advance,
>
> Aviso Legal: Este documento pode conter informações confidenciais e/ou
> privilegiadas. Se você não for o destinatário ou a pessoa autorizada a
> receber este documento, não deve usar, copiar ou divulgar as informações
> nele contidas ou tomar qualquer ação baseada nessas informações.
>
> Disclaimer: The information contained in this document may be privileged
> and confidential and protected from disclosure. If the reader of this
> document is not the intended recipient, or an employee agent responsible
> for delivering this document to the intended recipient, you are hereby
> notified that any dissemination, distribution or copying of this
> communication is strictly prohibited.
>
>

-- 
Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a 
receber este documento, não deve usar, copiar ou divulgar as informações 
nele contidas ou tomar qualquer ação baseada nessas informações.


Disclaimer: The information contained in this document may be privileged 
and confidential and protected from disclosure. If the reader of this 
document is not the intended recipient, or an employee agent responsible 
for delivering this document to the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this 
communication is strictly prohibited.



Re: How to sort Iterable in ProcessWindowFunction?

2022-03-07 Thread HG
And the comparator function

The order of the return 1,0,-1 is relevant .
In this order -1,0,1 it will sort descending I discovered.

public static class SortEventsHandlingTime implements
Comparator> {

// Let's compare 2 Tuple4 objects
public int compare(Tuple4 o1,
Tuple4 o2)
{
if (Long.parseLong(o1.getField(0).toString()) >
Long.parseLong(o2.getField(0).toString())) {
return 1;
}
else if (Long.parseLong(o1.getField(0).toString()) ==
Long.parseLong(o2.getField(0).toString())){
return 0;
}
else {
return -1;
}
}
}


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao :

> Collect the elements to a list, then sort, then collect out.
>
> HG  于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG :
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction, String, String,
>>> TimeWindow> {
>>> @Override
>>> public void process(String key, Context context,
>>> Iterable> input, Collector out)
>>> {
>>> Long elapsed   = 0L;
>>> Long pHandlingTime = 0L;
>>> Long totalElapsed  = 0L
>>>
>>> System.out.println(input.getClass());
>>>
>>> Iterator> etter =
>>> input.iterator();
>>> *for (Tuple4 in: input){*
>>> transactionId = in.getField(2).toString();
>>> elapsed   =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>> totalElapsed  = totalElapsed + elapsed;
>>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>> out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>> }
>>> }
>>> }
>>>
>>


Re: Task Manager shutdown causing jobs to fail

2022-03-07 Thread Zhilong Hong
Hi, Puneet:

Like Terry says, if you find your job failed unexpectedly, you could check
the configuration restart-strategy in your flink-conf.yaml. If the restart
strategy is set to be disabled or none, the job will transition to failed
once it encounters a failover. The job would also fail itself if the
failover rate or attempts exceed the limit. For more information please
refer to [1] and [2].

Best,
Zhilong

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#fault-tolerance
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy

On Mon, Mar 7, 2022 at 11:45 PM Puneet Duggal 
wrote:

> Hi Terry Wang,
>
> So adding to above provided context.. whenever task manager goes down,
> jobs go into failed state and do not restart. Even though there are good
> enough free slots available on other task manager to get restarted on.
>
> Regards,
> Puneet
>
> On 04-Mar-2022, at 4:54 PM, Terry Wang  wrote:
>
> Hi, Puneet~
>
> AFAIK, that should be expected behavior that jobs on crashed TaskManager
> restarts. HA means there is no single point risk but Flink job still need
> to through failover to ensure state and data consistency. You may refer
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
>  for
> more details.
>
> On Fri, Mar 4, 2022 at 2:50 AM Puneet Duggal 
> wrote:
>
>> Hi,
>>
>> Currently in production, i have HA session mode flink cluster with 3 job
>> managers and multiple task managers with more than enough free task slots.
>> But i have seen multiple times that whenever task manager goes down ( e.g.
>> due to heartbeat issue).. so does all the jobs running on it even when
>> there are standby task managers availaible with free slots to run them on.
>> Has anyone faced this issue?
>>
>> Regards,
>> Puneet
>
>
>
> --
> Best Regards,
> Terry Wang
>
>
>


Re: Task Manager shutdown causing jobs to fail

2022-03-07 Thread Puneet Duggal
Hi Terry Wang,

So adding to above provided context.. whenever task manager goes down, jobs go 
into failed state and do not restart. Even though there are good enough free 
slots available on other task manager to get restarted on.

Regards,
Puneet

> On 04-Mar-2022, at 4:54 PM, Terry Wang  wrote:
> 
> Hi, Puneet~
> 
> AFAIK, that should be expected behavior that jobs on crashed TaskManager 
> restarts. HA means there is no single point risk but Flink job still need to 
> through failover to ensure state and data consistency. You may refer  
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
>  
> 
>  for more details.
> 
> On Fri, Mar 4, 2022 at 2:50 AM Puneet Duggal  > wrote:
> Hi,
> 
> Currently in production, i have HA session mode flink cluster with 3 job 
> managers and multiple task managers with more than enough free task slots. 
> But i have seen multiple times that whenever task manager goes down ( e.g. 
> due to heartbeat issue).. so does all the jobs running on it even when there 
> are standby task managers availaible with free slots to run them on. Has 
> anyone faced this issue?
> 
> Regards, 
> Puneet
> 
> 
> -- 
> Best Regards,
> Terry Wang



Re: Could not stop job with a savepoint

2022-03-07 Thread Dawid Wysakowicz

Hi,

From the exception it seems the job has been already done when you're 
triggering the savepoint.


Best,

Dawid

On 07/03/2022 14:56, Vinicius Peracini wrote:

Hello everyone,

I have a Flink job (version 1.14.0 running on EMR) and I'm having this 
issue while trying to stop a job with a savepoint on S3:


org.apache.flink.util.FlinkException: Could not stop with a savepoint 
job "df3a3c590fabac737a17f1160c21094c".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)

at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
Coordinator is suspending.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)

... 9 more

I'm using incremental and unaligned checkpoints (aligned checkpoint 
timeout is 30 seconds). I also tried to create the savepoint without 
stopping the job (using flink savepoint command) and got the same 
error. Any idea what is happening here?


Thanks in advance,

Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a 
receber este documento, não deve usar, copiar ou divulgar as 
informações nele contidas ou tomar qualquer ação baseada nessas 
informações.


Disclaimer: The information contained in this document may be 
privileged and confidential and protected from disclosure. If the 
reader of this document is not the intended recipient, or an employee 
agent responsible for delivering this document to the intended 
recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited.


OpenPGP_signature
Description: OpenPGP digital signature


Re: Question about Flink counters

2022-03-07 Thread Dawid Wysakowicz

Hi Shane,

I don't think counters, or should I say metrics, are the right 
abstraction for the use case you described. Metrics are a way to get an 
insight into the running job and what is its current state. It is not a 
good mean to calculate results. Metrics are not stateful, they are not 
preserved across restarts. Counters are generally scoped. Therefore 
counters in UDFs are scoped[1] to the parallel instance that uses it. 
You should combine them on the monitoring system side if you need a more 
general overview.



Hope that helps,

Best,

Dawid


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#scope


On 06/03/2022 15:13, Shane Bishop wrote:

Hi Zhanghao Chen,

Sure, I can give some context.

My team's Flink application runs as a Kinesis Data Analytics streaming 
application [1] in AWS.


Our application receives events from Amazon Simple Queue Service (SQS) 
[2] in our source, and then uses a property of the SQS event to 
download from Amazon S3 [3]. The external metrics system for our 
counters is Amazon CloudWatch metrics [4].


For both the SQS consumer source and our S3 downloader operator, we 
have a counter for number of received items, number of successfully 
processed items, and number of items that failed to process.


However, during testing we have found that the count for SQS events 
received and S3 downloads is much too high. The counts for our 
counters in CloudWatch is much higher than the number of records 
reported in the Flink dashboard.


The goal is that our metrics in CloudWatch should accurately reflect 
the number of SQS events received and successfully or unsuccessfully 
processed, and the number of S3 downloads that were attempted and 
succeeded or failed.


I am looking for help understanding why our counter values are inaccurate.

[1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html
[2] 
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html

[3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html
[4] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html



*From:* Zhanghao Chen 
*Sent:* March 5, 2022 11:11 PM
*To:* Shane Bishop ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
Hi Shane,

Could you share more information on what you would like to use the 
counter for?


The counter discussed here is primarily designed for exposing counts 
to external metric systems. Usually, each task would count on its 
own, and it is left for the external metric system (usu. a time series 
database) to do aggregations. Also, you cannot reference a counter 
from a different machine. I'm not sure if this is what you expected.


Best,
Zhanghao Chen

*From:* Shane Bishop 
*Sent:* Saturday, March 5, 2022 23:22
*To:* Zhanghao Chen ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
If I used a thread-safe counter implementation, would that be enough 
to make the count correct for a Flink cluster with multiple machines?


Best,
Shane

*From:* Zhanghao Chen 
*Sent:* March 4, 2022 11:08 PM
*To:* Shane Bishop ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
Hi Shane,

Flink provides a generic counter interface with a few implementations. 
The default implementation SimpleCounter, which is not 
thread-safe,//is used when you calling counter(String name) on a 
MetricGroup. Therefore, you'll need to use your own thread-safe 
implementation, check out the second example of Metrics | Apache Flink 
 for 
reference.


Best,
Zhanghao Chen

*From:* Shane Bishop 
*Sent:* Saturday, March 5, 2022 5:24
*To:* user@flink.apache.org 
*Subject:* Question about Flink counters
Hi all,

For Flink counters [1], are increment operations guaranteed to be 
atomic across all parallel tasks? I.e., is there a guarantee that the 
counter values will not be higher than expected?


Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


OpenPGP_signature
Description: OpenPGP digital signature


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-07 Thread HG
For the record. So that other unexperienced people my benefit too
 😬

List> inputList = new ArrayList<>();

input.forEach(inputList::add);
inputList.sort(new SortEventsHandlingTime());

for (Tuple4 in: inputList){


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao :

> Collect the elements to a list, then sort, then collect out.
>
> HG  于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG :
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction, String, String,
>>> TimeWindow> {
>>> @Override
>>> public void process(String key, Context context,
>>> Iterable> input, Collector out)
>>> {
>>> Long elapsed   = 0L;
>>> Long pHandlingTime = 0L;
>>> Long totalElapsed  = 0L
>>>
>>> System.out.println(input.getClass());
>>>
>>> Iterator> etter =
>>> input.iterator();
>>> *for (Tuple4 in: input){*
>>> transactionId = in.getField(2).toString();
>>> elapsed   =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>> totalElapsed  = totalElapsed + elapsed;
>>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>> out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>> }
>>> }
>>> }
>>>
>>


Could not stop job with a savepoint

2022-03-07 Thread Vinicius Peracini
Hello everyone,

I have a Flink job (version 1.14.0 running on EMR) and I'm having this
issue while trying to stop a job with a savepoint on S3:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"df3a3c590fabac737a17f1160c21094c".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
Coordinator is suspending.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)
... 9 more

I'm using incremental and unaligned checkpoints (aligned checkpoint timeout
is 30 seconds). I also tried to create the savepoint without stopping the
job (using flink savepoint command) and got the same error. Any idea what
is happening here?

Thanks in advance,

-- 
Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a 
receber este documento, não deve usar, copiar ou divulgar as informações 
nele contidas ou tomar qualquer ação baseada nessas informações.


Disclaimer: The information contained in this document may be privileged 
and confidential and protected from disclosure. If the reader of this 
document is not the intended recipient, or an employee agent responsible 
for delivering this document to the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this 
communication is strictly prohibited.



Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-07 Thread Jing Ge
Hi,

Thanks Martijn for driving this discussion. Your concerns are very
rational.

We should do our best to keep the Flink development on the right track. I
would suggest discussing it in a vision/goal oriented way. Since Flink has
a clear vision of unified batch and stream processing, supporting batch
jobs will be one of the critical core features to help us reach the vision
and let Flink have an even bigger impact in the industry. I fully agree
with you that we should not focus on the Hive query syntax. Instead of it,
we should build a plan/schedule to support batch query syntax for the
vision. If there is any conflict between Hive query syntax and common batch
query syntax, we should stick with the common batch query syntax. For any
Hive specific query syntax, which is not supported as a common case by
other batch process engines, we should think very carefully and implement
it as a dialect extension like you suggested, but only when it is a
critical business requirement and has broad impact on many use cases. Last
but not least, from architecture's perspective, it is good to have the
capability to support arbitrary syntax via dialect/extension/plugin. But it
will also require a lot of effort to make it happen. Trade-off is always
the key. Currently, I have to agree with you again, we should focus more on
the common (batch) cases.


Best regards,
Jing

On Mon, Mar 7, 2022 at 1:53 PM Jing Zhang  wrote:

> Hi Martijn,
>
> Thanks for driving this discussion.
>
> +1 on efforts on more hive syntax compatibility.
>
> With the efforts on batch processing in recent versions(1.10~1.15), many
> users have run batch processing jobs based on Flink.
> In our team, we are trying to migrate most of the existing online batch
> jobs from Hive/Spark to Flink. We hope this migration does not require
> users to modify their sql.
> Although Hive is not as popular as it used to be, Hive SQL is still alive
> because many users still use Hive SQL to run spark jobs.
> Therefore, compatibility with more HIVE syntax is critical to this
> migration work.
>
> Best,
> Jing Zhang
>
>
>
> Martijn Visser  于2022年3月7日周一 19:23写道:
>
>> Hi everyone,
>>
>> Flink currently has 4 APIs with multiple language support which can be
>> used
>> to develop applications:
>>
>> * DataStream API, both Java and Scala
>> * Table API, both Java and Scala
>> * Flink SQL, both in Flink query syntax and Hive query syntax (partially)
>> * Python API
>>
>> Since FLIP-152 [1] the Flink SQL support has been extended to also support
>> the Hive query syntax. There is now a follow-up FLINK-26360 [2] to address
>> more syntax compatibility issues.
>>
>> I would like to open a discussion on Flink directly supporting the Hive
>> query syntax. I have some concerns if having a 100% Hive query syntax is
>> indeed something that we should aim for in Flink.
>>
>> I can understand that having Hive query syntax support in Flink could help
>> users due to interoperability and being able to migrate. However:
>>
>> - Adding full Hive query syntax support will mean that we go from 6 fully
>> supported API/language combinations to 7. I think we are currently already
>> struggling with maintaining the existing combinations, let another one
>> more.
>> - Apache Hive is/appears to be a project that's not that actively
>> developed
>> anymore. The last release was made in January 2021. It's popularity is
>> rapidly declining in Europe and the United State, also due Hadoop becoming
>> less popular.
>> - Related to the previous topic, other software like Snowflake,
>> Trino/Presto, Databricks are becoming more and more popular. If we add
>> full
>> support for the Hive query syntax, then why not add support for Snowflake
>> and the others?
>> - We are supporting Hive versions that are no longer supported by the Hive
>> community with known security vulnerabilities. This makes Flink also
>> vulnerable for those type of vulnerabilities.
>> - The currently Hive implementation is done by using a lot of internals of
>> Flink, making Flink hard to maintain, with lots of tech debt and making
>> things overly complex.
>>
>> From my perspective, I think it would be better to not have Hive query
>> syntax compatibility directly in Flink itself. Of course we should have a
>> proper Hive connector and a proper Hive catalog to make connectivity with
>> Hive (the versions that are still supported by the Hive community) itself
>> possible. Alternatively, if Hive query syntax is so important, it should
>> not rely on internals but be available as a dialect/pluggable option. That
>> could also open up the possibility to add more syntax support for others
>> in
>> the future, but I really think we should just focus on Flink SQL itself.
>> That's already hard enough to maintain and improve on.
>>
>> I'm looking forward to the thoughts of both Developers and Users, so I'm
>> cross-posting to both mailing lists.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>> [1]
>> ht

Re: Shaded zookeeper - curator mismatch?

2022-03-07 Thread Zhanghao Chen
Hi Filip,

Curator 4.2 is compatible with ZK 3.4 series. When it detects that ZK 3.4 
client is used, it will operate in ZK 3.4 compatible mode and won't call 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.flexible.QuorumMaj.(Ljava/util/Map;).

Curator detects whether the ZK client is of 3.4 series by detecting the 
existence of 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.admin.ZooKeeperAdmin in 
classpath. (see 
org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility). You 
can try searching for that class in your application's classpath to see if a 
higher version of ZK client is mistakenly packaged.

Hope that helps.

Best,
Zhanghao Chen

From: Filip Karnicki 
Sent: Monday, March 7, 2022 16:49
To: Zhanghao Chen 
Cc: user 
Subject: Re: Shaded zookeeper - curator mismatch?

Hi  Zhanghao

it's 3.5.5

Thank you
Fil

On Sat, 5 Mar 2022 at 08:12, Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Filip,

Could you share the version of the ZK server you are connecting to?


Best,
Zhanghao Chen

From: Filip Karnicki mailto:filip.karni...@gmail.com>>
Sent: Friday, March 4, 2022 23:12
To: user mailto:user@flink.apache.org>>
Subject: Shaded zookeeper - curator mismatch?

Hi, I believe there's a mismatch in shaded zookeeper/curator dependencies. I 
see that curator 4.2.0 needs zookeeper 3.5.4-beta, but it's still used in 
flink-shaded-zookeeper-34, which as far as I can tell is used by flink runtime 
1.14.3

https://mvnrepository.com/artifact/org.apache.flink/flink-runtime/1.14.3

https://github.com/apache/flink-shaded/blob/release-14.0/flink-shaded-zookeeper-parent/flink-shaded-zookeeper-34/pom.xml

I believe this might be causing an issue for us while running a statefun uber 
jar in a cloudera cluster (indeed QuorumMaj only has a constructor that takes 
an int in zk 3.4.x, not a map)

Am I understanding this correctly? Is this something that we need to fix with a 
patch?



org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617)
 [fatty.fil.jar:?]

at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
 [flink-dist_2.12-1.14.0-csa1.6.1.0.jar:1.14.0-csa1.6.1.0]

Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.flexible.QuorumMaj.(Ljava/util/Map;)V

at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker.(EnsembleTracker.java:57)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:159)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:165)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:248)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:233)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193)
 ~[fatty.fil.jar:?]

at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_212]

at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_212]

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
 ~[hadoop-common-3.1.1.7.1.7.74-6.jar:?]

at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[fatty.fil.jar:?]

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
 ~[fatty.fil.jar:?]

... 2 more



Many thanks

Fil


Re: Max parallelism and reactive mode

2022-03-07 Thread Roman Khachatryan
Hi Alexis,

> Is it possible to have different values of max parallelism in different 
> operators?
Yes, it is possible, please refer to [1] and [2] for API details.

> I did a test in which my source had a max parallelism of 3, whereas a 
> downstream operator had a (non-max) parallelism explicitly set to 4, and the 
> job could not be started. Could this related to operator chaining? Or maybe 
> the whole job ended up with a max parallelism of 3 because I didn't set it 
> and it took the value from the source?
Could you share the error details? I guess the downstream operator
inherited its upstream max parallelism; so its parallelism of
exceeded max of 3.

> Additionally, the documentation states that, in reactive mode, only max 
> parallelism is taken into account, so if I want to limit the number of 
> parallel instances of my sources and sinks, I'd have to set their max 
> parallelism, and that would be different from that of the rest of the 
> operators.
> Moreover, is it possible to switch a job from non-reactive to reactive mode 
> via savepoints? What happens if my max parallelism settings change during the 
> switch? For example, to limit my sink to a single instance.
No, max parallelism can not be changed because the state in the
savepoint is distributed according to it.

> In summary, for a hypothetical pipeline that basically does something like: 
> source (parallelism between 1 & 3) -> stateful operator (parallelism between 
> 1 & 32) -> sink (parallelism exactly 1 always)
what should I do regarding max parallelism (both for execution env an
operators) in normal mode, what should I do in reactive mode, and can
I switch between modes with savepoints?
I'm assuming that the stream is keyed (for non-keyed operator max
parallelism doesn't make much sense).
I think you can have the same max parallelism in both mode and switch
safely. From your example, each operator should have different max
parallelism (source:3 -> statefeul-operator:32 -> sink:1), so it
should be configured on operator level. You'll probably want to
explore higher max parallelism to get more efficient state
distribution and rescaling.
In normal mode, you can additionally set parallelism manually.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#setMaxParallelism-int-
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/api/dag/Transformation.html#setMaxParallelism-int-

Regards,
Roman

On Thu, Mar 3, 2022 at 11:34 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
> I have some questions regarding max parallelism and how interacts with 
> deployment modes. The documentation states that max parallelism should be 
> "set on a per-job and per-operator granularity" but doesn't provide more 
> details. Is it possible to have different values of max parallelism in 
> different operators? I did a test in which my source had a max parallelism of 
> 3, whereas a downstream operator had a (non-max) parallelism explicitly set 
> to 4, and the job could not be started. Could this related to operator 
> chaining? Or maybe the whole job ended up with a max parallelism of 3 because 
> I didn't set it and it took the value from the source?
>
> Additionally, the documentation states that, in reactive mode, only max 
> parallelism is taken into account, so if I want to limit the number of 
> parallel instances of my sources and sinks, I'd have to set their max 
> parallelism, and that would be different from that of the rest of the 
> operators.
>
> Moreover, is it possible to switch a job from non-reactive to reactive mode 
> via savepoints? What happens if my max parallelism settings change during the 
> switch? For example, to limit my sink to a single instance.
>
> In summary, for a hypothetical pipeline that basically does something like: 
> source (parallelism between 1 & 3) -> stateful operator (parallelism between 
> 1 & 32) -> sink (parallelism exactly 1 always)
> what should I do regarding max parallelism (both for execution env an 
> operators) in normal mode, what should I do in reactive mode, and can I 
> switch between modes with savepoints?
>
> Regards,
> Alexis.
>


Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-07 Thread Jing Zhang
Hi Martijn,

Thanks for driving this discussion.

+1 on efforts on more hive syntax compatibility.

With the efforts on batch processing in recent versions(1.10~1.15), many
users have run batch processing jobs based on Flink.
In our team, we are trying to migrate most of the existing online batch
jobs from Hive/Spark to Flink. We hope this migration does not require
users to modify their sql.
Although Hive is not as popular as it used to be, Hive SQL is still alive
because many users still use Hive SQL to run spark jobs.
Therefore, compatibility with more HIVE syntax is critical to this
migration work.

Best,
Jing Zhang



Martijn Visser  于2022年3月7日周一 19:23写道:

> Hi everyone,
>
> Flink currently has 4 APIs with multiple language support which can be used
> to develop applications:
>
> * DataStream API, both Java and Scala
> * Table API, both Java and Scala
> * Flink SQL, both in Flink query syntax and Hive query syntax (partially)
> * Python API
>
> Since FLIP-152 [1] the Flink SQL support has been extended to also support
> the Hive query syntax. There is now a follow-up FLINK-26360 [2] to address
> more syntax compatibility issues.
>
> I would like to open a discussion on Flink directly supporting the Hive
> query syntax. I have some concerns if having a 100% Hive query syntax is
> indeed something that we should aim for in Flink.
>
> I can understand that having Hive query syntax support in Flink could help
> users due to interoperability and being able to migrate. However:
>
> - Adding full Hive query syntax support will mean that we go from 6 fully
> supported API/language combinations to 7. I think we are currently already
> struggling with maintaining the existing combinations, let another one
> more.
> - Apache Hive is/appears to be a project that's not that actively developed
> anymore. The last release was made in January 2021. It's popularity is
> rapidly declining in Europe and the United State, also due Hadoop becoming
> less popular.
> - Related to the previous topic, other software like Snowflake,
> Trino/Presto, Databricks are becoming more and more popular. If we add full
> support for the Hive query syntax, then why not add support for Snowflake
> and the others?
> - We are supporting Hive versions that are no longer supported by the Hive
> community with known security vulnerabilities. This makes Flink also
> vulnerable for those type of vulnerabilities.
> - The currently Hive implementation is done by using a lot of internals of
> Flink, making Flink hard to maintain, with lots of tech debt and making
> things overly complex.
>
> From my perspective, I think it would be better to not have Hive query
> syntax compatibility directly in Flink itself. Of course we should have a
> proper Hive connector and a proper Hive catalog to make connectivity with
> Hive (the versions that are still supported by the Hive community) itself
> possible. Alternatively, if Hive query syntax is so important, it should
> not rely on internals but be available as a dialect/pluggable option. That
> could also open up the possibility to add more syntax support for others in
> the future, but I really think we should just focus on Flink SQL itself.
> That's already hard enough to maintain and improve on.
>
> I'm looking forward to the thoughts of both Developers and Users, so I'm
> cross-posting to both mailing lists.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=165227316
> [2] https://issues.apache.org/jira/browse/FLINK-21529
>


[DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-07 Thread Martijn Visser
Hi everyone,

Flink currently has 4 APIs with multiple language support which can be used
to develop applications:

* DataStream API, both Java and Scala
* Table API, both Java and Scala
* Flink SQL, both in Flink query syntax and Hive query syntax (partially)
* Python API

Since FLIP-152 [1] the Flink SQL support has been extended to also support
the Hive query syntax. There is now a follow-up FLINK-26360 [2] to address
more syntax compatibility issues.

I would like to open a discussion on Flink directly supporting the Hive
query syntax. I have some concerns if having a 100% Hive query syntax is
indeed something that we should aim for in Flink.

I can understand that having Hive query syntax support in Flink could help
users due to interoperability and being able to migrate. However:

- Adding full Hive query syntax support will mean that we go from 6 fully
supported API/language combinations to 7. I think we are currently already
struggling with maintaining the existing combinations, let another one
more.
- Apache Hive is/appears to be a project that's not that actively developed
anymore. The last release was made in January 2021. It's popularity is
rapidly declining in Europe and the United State, also due Hadoop becoming
less popular.
- Related to the previous topic, other software like Snowflake,
Trino/Presto, Databricks are becoming more and more popular. If we add full
support for the Hive query syntax, then why not add support for Snowflake
and the others?
- We are supporting Hive versions that are no longer supported by the Hive
community with known security vulnerabilities. This makes Flink also
vulnerable for those type of vulnerabilities.
- The currently Hive implementation is done by using a lot of internals of
Flink, making Flink hard to maintain, with lots of tech debt and making
things overly complex.

>From my perspective, I think it would be better to not have Hive query
syntax compatibility directly in Flink itself. Of course we should have a
proper Hive connector and a proper Hive catalog to make connectivity with
Hive (the versions that are still supported by the Hive community) itself
possible. Alternatively, if Hive query syntax is so important, it should
not rely on internals but be available as a dialect/pluggable option. That
could also open up the possibility to add more syntax support for others in
the future, but I really think we should just focus on Flink SQL itself.
That's already hard enough to maintain and improve on.

I'm looking forward to the thoughts of both Developers and Users, so I'm
cross-posting to both mailing lists.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=165227316
[2] https://issues.apache.org/jira/browse/FLINK-21529


Re: Shaded zookeeper - curator mismatch?

2022-03-07 Thread Filip Karnicki
Hi  Zhanghao

it's 3.5.5

Thank you
Fil

On Sat, 5 Mar 2022 at 08:12, Zhanghao Chen 
wrote:

> Hi Filip,
>
> Could you share the version of the ZK server you are connecting to?
>
>
> Best,
> Zhanghao Chen
> --
> *From:* Filip Karnicki 
> *Sent:* Friday, March 4, 2022 23:12
> *To:* user 
> *Subject:* Shaded zookeeper - curator mismatch?
>
> Hi, I believe there's a mismatch in shaded zookeeper/curator dependencies.
> I see that curator 4.2.0 needs zookeeper 3.5.4-beta, but it's still used in
> flink-shaded-zookeeper-34, which as far as I can tell is used by flink
> runtime 1.14.3
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-runtime/1.14.3
>
>
> https://github.com/apache/flink-shaded/blob/release-14.0/flink-shaded-zookeeper-parent/flink-shaded-zookeeper-34/pom.xml
>
> I believe this might be causing an issue for us while running a statefun
> uber jar in a cloudera cluster (indeed QuorumMaj only has a constructor
> that takes an int in zk 3.4.x, not a map)
>
> Am I understanding this correctly? Is this something that we need to fix
> with a patch?
>
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617)
> [fatty.fil.jar:?]
>
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
> [flink-dist_2.12-1.14.0-csa1.6.1.0.jar:1.14.0-csa1.6.1.0]
>
> *Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.flexible.QuorumMaj.(Ljava/util/Map;)*
> V
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker.(EnsembleTracker.java:57)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:159)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:165)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:248)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:233)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193)
> ~[fatty.fil.jar:?]
>
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_212]
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
> ~[?:1.8.0_212]
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> ~[hadoop-common-3.1.1.7.1.7.74-6.jar:?]
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> ~[fatty.fil.jar:?]
>
> ... 2 more
>
>
>
> Many thanks
>
> Fil
>