Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-10-09 Thread Matthias
Reviving this thread again after I came across FLINK-12214 [1] since there
are use cases which might benefit from this feature. Was there some
conclusion on public APIs in the meantime? Should we proceed with the
discussion here?

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-12214



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Fwd: Flink memory usage monitoring

2020-10-27 Thread Matthias Pohl
I missed adding the mailing list in my previous email.

-- Forwarded message -
From: Matthias Pohl 
Date: Tue, Oct 27, 2020 at 12:39 PM
Subject: Re: Flink memory usage monitoring
To: Rajesh Payyappilly Jose 


Hi Rajesh,
thanks for reaching out to us. We worked on providing metrics for managed
memory and network memory as part of FLIP-102 [1]. It looks like these
features are going to be added to the upcoming release of Flink 1.12.

We decided to not include off-heap memory as it is not necessarily under
control of Flink (e.g. user code can allocate native memory and Flink
wouldn't be aware of it). Hence, providing numbers for off-heap memory
usage might be misleading. There will be a metric to monitor the Metaspace
usage, though.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager

On Tue, Oct 20, 2020 at 8:23 PM Rajesh Payyappilly Jose 
wrote:

> Classification: *Internal*
>
> Hi,
>
>
>
> Environment - Flink 1.11 on K8s
>
>
>
> Is there a way to monitor the usage of managed memory, off-heap memory and
> network memory?
>
>
>
> -Rajesh
>
>
> ::DISCLAIMER::
> --
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only. E-mail transmission is not
> guaranteed to be secure or error-free as information could be intercepted,
> corrupted, lost, destroyed, arrive late or incomplete, or may contain
> viruses in transmission. The e mail and its contents (with or without
> referred errors) shall therefore not attach any liability on the originator
> or HCL or its affiliates. Views or opinions, if any, presented in this
> email are solely those of the author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification, distribution and / or
> publication of this message without the prior written consent of authorized
> representative of HCL is strictly prohibited. If you have received this
> email in error please delete it and notify the sender immediately. Before
> opening any email and/or attachments, please check them for viruses and
> other defects.
>


Re: Kubernetes Job Cluster, does it autoterminate?

2020-10-28 Thread Matthias Pohl
Hi Ruben,
thanks for reaching out to us. Flink's native Kubernetes Application mode
[1] might be what you're looking for.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application

On Wed, Oct 28, 2020 at 11:50 AM Ruben Laguna 
wrote:

> Hi,
>
> First time user , I'm just evaluating Flink at the moment, and I was
> reading
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#deploy-job-cluster
> and I don't fully understand if a Job Cluster will autoterminate after
> the job is completed (for at batch job) ?
>
> The examples look to me like  like the task manager pods will continue
> running as it's configured as Deployment.
>
> So is there any way to achieve "autotermination" or am I supposed to
> monitor the job status externally (like from airflow) and delete the
> JobManager and TaskManager kubernetes resources from there?
>
> --
> /Rubén Laguna
>


Re: Running flink in a Local Execution Environment for Production Workloads

2020-10-29 Thread Matthias Pohl
Hi Joseph,
thanks for reaching out to us. There shouldn't be any downsides other than
the one you already mentioned as far as I know.

Best,
Matthias

On Fri, Oct 23, 2020 at 1:27 PM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> I plan to run flink jobs as docker containers in a AWS Elastic Container
> Service. I will have checkpointing enabled where state is stored in a s3
> bucket. Each deployment will run in a per-job mode.  Are there any
> non-obvious downsides to running these jobs with a local execution
> environment so that the deployment turns into deploying a single java
> application?
>
>
>
> The obvious downside is that you don’t get any horizontal scalability.
> That’s a given and I’d have to scale up not out in this mode. I’d like to
> discover if there are any other negatives with this approach.
>
>
>
> Thanks,
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: a couple of memory questions

2020-11-05 Thread Matthias Pohl
Hello Edward,
please find my answers within your message below:

On Wed, Nov 4, 2020 at 1:35 PM Colletta, Edward 
wrote:

> Using Flink 1.9.2 with FsStateBackend, Session cluster.
>
>
>
>1. Does heap state get cleaned up when a job is cancelled?
>
> We have jobs that we run on a daily basis.  We start each morning and
> cancel each evening.  We noticed that the process size does not seem to
> shrink.  We are looking at the resident size of the process with ps and
> also the USED column for Heap on the taskmanager page of the flink
> dashboard.
>
There is no explicit cleanup happening on the Flink side. The heap should
be cleaned up when GC kicks in.

>
>1. How can I examine the usage of Flink Managed Memory?
>
>  The configuration documentation seems to indicate this is used for batch
> jobs, and we are only using the Streaming API.   I reduced 
> taskmanager.memory.fraction
> to 0.3, but I think this is still reserving too much memory to an area we
> will not be using.
>
Unfortunately, I don't know of any way to monitor the managed memory for
Flink 1.9.2 as is. We're going to introduce new metrics for managed memory
[1], network memory [2] and metaspace [3] in the upcoming release of Flink
1.12.0. This should make it easier to monitor these memory pools.

I hope that helps a bit.
Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14406
[2] https://issues.apache.org/jira/browse/FLINK-14422
[3] https://issues.apache.org/jira/browse/FLINK-19617


Re: Job crash in job cluster mode

2020-11-10 Thread Matthias Pohl
Hi Tim,
I'm not aware of any memory-related issues being related to the deployment
mode used. Have you checked the logs for hints? Additionally, you could try
to extract a heap dump. That might help you in analyzing the cause of the
memory consumption.

The TaskManager and JobManager are logging the effective memory-related
configuration during startup. You can look out for the "Preconfiguration"
section in each of the log files to get a drill-down of how much memory is
used per memory pool.

Best,
Matthias

On Tue, Nov 10, 2020 at 3:37 PM Tim Eckhardt 
wrote:

> Hi there,
>
>
>
> I have a problem with running a flink job in job cluster mode using flink
> 1.11.1 (also tried 1.11.2).
>
> The same job is running well using the session cluster mode as well as
> using flink 1.10.0 in job cluster mode.
>
>
>
> The job starts running and is running for quite some time but it runs a
> lot slower than in session cluster mode and crashes after running for about
> an hour. I can observe in the flink dashboard that the JVM heap is constant
> at a high level and is getting slowly closer to the limit (4.13GB in my
> case) which it reaches close to the job crashing.
>
> There is also some G1_Old_Generation garbage collection going on which I
> cannot observe in session mode as well.
>
>
>
> GC values after running for about 45min:
>
>
>
> (Collector, Count, Time)
>
> *G1_Young_Generation   *1,250  107,937
>
> *G1_Old_Generation  *322  2,432,362
>
>
>
> Compared to the GC values of the same job in session cluster mode (after
> the same runtime):
>
>
>
> *G1_Young_Generation   *1,920  20,575
>
> *G1_Old_Generation  *0  0
>
>
>
> So my vague guess is that it has to be something memory related maybe
> configuration wise.
>
>
>
> To simplify the setup only one jobmanager and one taskmanager is used. The
> taskmanager has a memory setting of: taskmanager.memory.process.size:
> 1m which should be totally fine for the server. The jobmanager has a
> defined heap_size of 1600m.
>
>
>
> Maybe somebody has experienced something like this before?
>
>
>
> Also is there a way to export the currently loaded configuration
> parameters of the job- and taskmanagers in a cluster? For example I can’t
> see the current memory process size of the taskmanager in the flink
> dashboard. Because this way I could compare the running and crashing setups
> more easily (using docker and environment variables for configuration at
> the moment which makes it a bit harder to debug).
>
>
>
> Thanks.
>


Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will contribute to
the overall memory reserved for a TaskManager container during deployment.
This parameter can be used to influence the amount of memory allocated if
the user code relies on DirectByteBuffers and/or native memory allocation.
There is no active memory pool management beyond that from Flink's side.
The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
network buffers) and native memory (through Flink's internally used managed
memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I
hope that helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
wrote:

> Thank you Xuannan for the reply.
>
> Also I want to ask about how Flink uses the off-heap memory. If I set
> taskmanager.memory.task.off-heap.size then which data does Flink allocate
> off-heap? This is handle by the programmer?
>
> Best,
> Iacovos
> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>
> Hi Jack,
>
> At the moment, Flink doesn't support caching the intermediate result.
> However, there is some ongoing effort to support caching in Flink.
> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
> is planned for 1.13.
>
> Best,
> Xuannan
>
> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
> wrote:
>
> Hello all,
>
> I am new to Flink and I want to ask if the Flink supports a caching
> mechanism to store intermediate results in memory for machine learning
> workloads.
>
> If yes, how can I enable it and how can I use it?
>
> Thank you,
> Iacovos
>
>


Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-11 Thread Matthias Pohl
Hi Jiahui,
thanks for reaching out to the mailing list. This is not something I have
expertise in. But have you checked out the Flink SSL Setup documentation
[1]? Maybe, you'd find some help there.

Additionally, I did go through the code a bit: A SecurityContext is loaded
during ClusterEntrypoint startup [2]. It supports dynamic loading of
security modules. You might have to implement
org.apache.flink.runtime.security.contexts.SecurityContextFactory and
configure it in your flink-conf.yaml. Is this something that might help
you? I'm adding Aljoscha to this thread as he worked on dynamically loading
these modules recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
[2]
https://github.com/apache/flink/blob/2c8631a4eb7a247ce8fb4205f838e8c0f8019367/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L170

On Wed, Nov 11, 2020 at 6:17 AM Jiahui Jiang 
wrote:

> Ping on this 🙂  It there anyway I can run a script or implement some
> interface to run before the Dispatcher service starts up to dynamically
> generate the keystore?
>
> Thank you!
> --
> *From:* Jiahui Jiang 
> *Sent:* Monday, November 9, 2020 3:19 PM
> *To:* user@flink.apache.org 
> *Subject:* SSL setup for YARN deployment when hostnames are unknown.
>
> Hello Flink!
>
> We are working on turning on REST SSL for YARN deployments. We built a
> generic orchestration server that can submit Flink clusters to any YARN
> clusters given the relevant Hadoop configs. But this means we may not know
> the hostname the Job Managers can be deployed onto - not even through wild
> card DNS names
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html#tips-for-yarn--mesos-deployment>
> as recommended in the documentation.
>
> I’m wondering is there any factory class that I can implement that can
> allow me to generate a private key and import that to JM’s keystore at
> runtime?
> Or is there any other recommended way to handle the cases where we don’t
> know the potential JM hosts at all?
>
> Thank you!
>
>


Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
When talking about the "off-heap" in your most recent message, are you
still referring to the task's off-heap configuration value? AFAIK,
the HybridMemorySegment shouldn't be directly related to the off-heap
parameter.

The HybridMemorySegment can be used as a wrapper around any kind of
memory, i.e. byte[]. It can be either used for heap memory but also
DirectByteBuffers (located in JVM's direct memory pool which is not part of
the JVM's heap) or memory allocated through Unsafe's allocation methods
(so-called native memory which is also not part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. The
MemoryManager instances are responsible for maintaining the managed memory
used in each of the TaskSlots. Managed Memory is used in different settings
(e.g. for the RocksDB state backend in streaming applications). It can be
configured using taskmanager.memory.managed.size (or the corresponding
*.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory

On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Thank you for your reply and useful information. I find that the off-heap
> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
> when to use these HybridMemorySegments and in which operations this is
> happened?
>
> Best,
> Iacovos
> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>
> Hi Iacovos,
> The task's off-heap configuration value is used when spinning up
> TaskManager containers in a clustered environment. It will contribute to
> the overall memory reserved for a TaskManager container during deployment.
> This parameter can be used to influence the amount of memory allocated if
> the user code relies on DirectByteBuffers and/or native memory allocation.
> There is no active memory pool management beyond that from Flink's side.
> The configuration parameter is ignored if you run a Flink cluster locally.
>
> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
> network buffers) and native memory (through Flink's internally used managed
> memory) internally.
>
> You can find a more detailed description of Flink's memory model in [1]. I
> hope that helps.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>
> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
> wrote:
>
>> Thank you Xuannan for the reply.
>>
>> Also I want to ask about how Flink uses the off-heap memory. If I set
>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>> off-heap? This is handle by the programmer?
>>
>> Best,
>> Iacovos
>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>
>> Hi Jack,
>>
>> At the moment, Flink doesn't support caching the intermediate result.
>> However, there is some ongoing effort to support caching in Flink.
>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>> is planned for 1.13.
>>
>> Best,
>> Xuannan
>>
>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
>> wrote:
>>
>> Hello all,
>>
>> I am new to Flink and I want to ask if the Flink supports a caching
>> mechanism to store intermediate results in memory for machine learning
>> workloads.
>>
>> If yes, how can I enable it and how can I use it?
>>
>> Thank you,
>> Iacovos
>>
>>


Re: Data loss exception using hash join in batch mode

2020-11-13 Thread Matthias Pohl
Hi 键,
we would need more context on your case (e.g. logs and more details on what
you're doing exactly or any other useful information) to help.

Best,
Matthias

On Thu, Nov 12, 2020 at 3:25 PM 键 <1941890...@qq.com> wrote:

> Data loss exception using hash join in batch mode
>


Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-13 Thread Matthias Pohl
Hi Averell,
thanks for sharing this with the Flink community. Is there anything
suspicious in the logs which you could share?

Best,
Matthias

On Fri, Nov 13, 2020 at 2:27 AM Averell  wrote:

> I have some updates. Some weird behaviours were found. Please refer to the
> attached photo.
>
> All requests were sent via REST API
>
> The status of the savepoint triggered by that stop request (ID 11018) is
> "COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in
> S3).
> The folder /`savepoint-5871af-c0f2d2334501/_metadata/`/ has been created in
> S3, but no files in that.
> This was the command I used to send the first stop request:
> /curl -s -d '{"drain": false,
> "targetDirectory":"*s3*://mybucket/savepoint"}' -H 'Content-Type:
> application/json' -X POST
> http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/
>
> Suspected that /s3:/// might be the issue, I tried to send another stop
> request (ID 11020), mistakenly having the path as /s3*s*:///. So it failed.
>
> Another stop request was sent (ID 11021). This one failed after timeout (10
> minutes). The GUI says the checkpoint failed with /`Checkpoint expired
> before completing`/.
> /curl -s -d '{"drain": false,
> "targetDirectory":"s3*a*://mybucket/savepoint"}' -H 'Content-Type:
> application/json' -X POST
> http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/stop/
>
> I tried to send a create-savepoint request (ID 11023), and this time, it
> completed successfully, with files persisted to S3. Checking Flink GUI I
> could see that the job actually resumed before that savepoint request (with
> the checkpoint ID 11021 created just 30 seconds after 11021 expired).
> /curl -s -d '{"target-directory":"s3a://mybucket/savepoint", "cancel-job":
> false}' -H 'Content-Type: application/json' -X POST
> http://myip:45507/jobs/5871af88ff279f30ebcc49ce741c2d75/savepoints
> /
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2020-11-13_at_11.png>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Logs of JobExecutionListener

2020-11-13 Thread Matthias Pohl
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the
following questions, please:
- What's the code of your Job's main method?
- What cluster backend and application do you use to execute the job?
- Is there anything suspicious you can find in the logs that might be
related?

Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier 
wrote:

> Actually what I'm experiencing is that the JobListener is executed
> successfully if I run my main class from the IDE, while the job listener is
> not fired at all if I submit the JobGraph of the application to a cluster
> using the RestClusterClient..
> Am I doing something wrong?
>
> My main class ends with the env.execute() and i do
> env.registerJobListener() when I create the Exceution env
> via ExecutionEnvironment.getExecutionEnvironment().
>
> Thanks in advance for any help,
> Flavio
>
> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier 
> wrote:
>
>> Hello everybody,
>> I'm trying to use the JobListener to track when a job finishes (with
>> Flink 1.11.0).
>> It works great but I have the problem that logs inside the onJobExecuted
>> are not logged anywhere..is it normal?
>>
>> Best,
>> Flavio
>>
>


Re: Filter By Value in List

2020-11-13 Thread Matthias Pohl
Hi Rex,
after verifying with Timo I created a new issue to address your proposal of
introducing a new operator [1]. Feel free to work on that one if you like.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-20148

On Thu, Nov 5, 2020 at 6:35 PM Rex Fenley  wrote:

> Thanks Timo,
>
> Checking if an element is in an Array does seem like a very useful
> function to have. Is there any plan to add it?
>
> Thanks
>
> On Thu, Nov 5, 2020 at 7:26 AM Timo Walther  wrote:
>
>> Hi Rex,
>>
>> as far as I know, the IN operator only works on tables or a list of
>> literals where the latter one is just a shortcut for multiple OR
>> operations. I would just go with a UDF for this case. In SQL you could
>> do an UNNEST to convert the array into a table and then use the IN
>> operator. But I'm not sure if this is a better solution.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 04.11.20 01:13, Rex Fenley wrote:
>> > None of the following appear to work either. Flink 1.11.2, Scala 2.12.
>> >
>> > table.filter("apple".in(List("apple")))
>> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
>> > incompatible types: String and ObjectArrayTypeInfo.
>> >
>> > table.filter("apple".in(java.util.Arrays.asList("apple")))
>> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
>> > incompatible types: String and ObjectArrayTypeInfo.
>> >
>> > table.filter(
>> > "apple".in(newju.ArrayList[String](java.util.Arrays.asList("apple")))
>> > )
>> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
>> > incompatible types: String and ObjectArrayTypeInfo.
>> >
>> >
>> > On Tue, Nov 3, 2020 at 2:32 PM Rex Fenley > > <mailto:r...@remind101.com>> wrote:
>> >
>> > Using a custom serializer to make sure I'm using a List does
>> > not help.
>> >
>> > [info]   org.apache.flink.table.api.ValidationException: IN operator
>> > on incompatible types: String and List.
>> >
>> > On Tue, Nov 3, 2020 at 12:44 PM Rex Fenley > > <mailto:r...@remind101.com>> wrote:
>> >
>> > For clarification, I'm using Pojo and operating on a column of
>> > this type
>> > publicjava.util.List fruits
>> >
>> > adding the following annotation does not help
>> > @DataTypeHint("ARRAY")
>> >
>> > On Mon, Nov 2, 2020 at 7:02 AM Aljoscha Krettek
>> > mailto:aljos...@apache.org>> wrote:
>> >
>> > I believe this is happening because the type system does not
>> > recognize
>> > that list of Strings as anything special but treats it as a
>> > black-box type.
>> >
>> > @Timo: Would this work with the new type system?
>> >
>> > Best,
>> > Aljoscha
>> >
>> > On 02.11.20 06:47, Rex Fenley wrote:
>> >  > Hello,
>> >  >
>> >  > I'm trying to filter the rows of a table by whether or
>> > not a value exists
>> >  > in an array column of a table.
>> >  > Simple example:
>> >  > table.where("apple".in($"fruits"))
>> >  >
>> >  > In this example, each row has a "fruits" Array
>> > column that could
>> >  > have 1 or many fruit strings which may or may not be
>> "apple".
>> >  >
>> >  > However, I keep receiving the following error when I do
>> > something similar
>> >  > to the example above:
>> >  > "IN operator on incompatible types: String and
>> > GenericType"
>> >  >
>> >  > Is there any way to accomplish this?
>> >  >
>> >  > Thanks!
>> >  >
>> >
>> >
>> >
>> > --
>> >
>> > Rex Fenley|Software Engineer - Mobile and Backend
>> >
>> >
>> > Remind.com <https://www.remind.com/>| BLOG
>> > <http://blog.remind.com/> | FOLLOW US
>> > <https://twitter.com/remindhq> | LIKE US
>> > <https://www.facebook.com/remindhq>
>> >
>> >
>> >
>> > --
>> >
>> > Rex Fenley|Software Engineer - Mobile and Backend
>> >
>> >
>> > Remind.com <https://www.remind.com/>| BLOG
>> > <http://blog.remind.com/> | FOLLOW US
>> > <https://twitter.com/remindhq> | LIKE US
>> > <https://www.facebook.com/remindhq>
>> >
>> >
>> >
>> > --
>> >
>> > Rex Fenley|Software Engineer - Mobile and Backend
>> >
>> >
>> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
>> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
>> > <https://www.facebook.com/remindhq>
>> >
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


Re: Assistance configuring access to GoogleCloudStorage for row format streaming file sink

2020-11-13 Thread Matthias Pohl
Hi,
thanks for reaching out to the Flink community. The tricky thing here is
that the Google Cloud Storage connector is not supported by Flink's plugin
system as stated in [1]. There is a blog post on how to get started with
Flink on Google's Cloud Platform [2]. In case you haven't seen that one,
yet: There is a subsection "Advanced: Set up access to Google Cloud Storage
for checkpoints and savepoints." describing the old way of adding support
for file systems specifically show-casing the GCP Storage. There you're
asked to copy the connector into Flink's lib/ directory, instead. In
addition to that, you have to add the Hadoop dependencies to the lib/
folder as well. For this, it's advisable to use a bundled Hadoop lib
provided by the Flink community [3] to avoid name clashes on the classpath.

I hope this helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#hadoop-file-system-hdfs-and-its-other-implementations
[2]
https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine
[3] https://flink.apache.org/downloads.html#additional-components

On Fri, Nov 13, 2020 at 2:32 PM orionemail 
wrote:

> Hi,
>
> I am running flink 1.10.1 initially on my local development machine -
> Macbook Pro.  I'm struggling to understand how to write to Google Cloud
> storage using the StreamingfileSink  (S3 works fine).
>
> There error I am seeing:
>
> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'gs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:117)"
>
>
> I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/
>
> plugins
> ├── gcs-connector
> │   └── gcs-connector-hadoop2-latest.jar
>
> In flink-yaml.conf I have added:
>
> fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
> google.cloud.auth.service.account.enable: true
> google.cloud.auth.service.account.json.keyfile: ~/key.json
>
> This mirrors the setup I used for s3 storage.
>
> My implementation is a simple test reading data from a kinesis stream and
> outputing to gcp.
>
> DataStream input = getKinesisSource(env, kinesisStream);
>
> final StreamingFileSink sink = StreamingFileSink
> .forRowFormat(new Path("gs://some-gcp-bucket"), new
> SimpleStringEncoder("UTF-8"))
> .withRollingPolicy(
> DefaultRollingPolicy.builder()
> .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
> .withMaxPartSize(1024 * 1024 * 1024)
> .build())
> .build();
>
> //input.print();
> input.addSink(sink);
>
>
> Not sure what else to try.  Any pointers appreciated.
>
>
>
> Sent with ProtonMail <https://protonmail.com> Secure Email.
>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-13 Thread Matthias Pohl
Hi Si-li,
trying to answer your initial question: Theoretically, you could try using
the co-location constraints to achieve this. But keep in mind that this
might lead to multiple Join operators running in the same JVM reducing the
amount of memory each operator can utilize.

Best,
Matthias

On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:

> Thanks for your reply.
>
> It's a streaming job. The join operator is doing join work, such as join.
> The join state is too large so I don't want to keep the state using the
> mechanism that Flink provided, and also I don't need very precise join. So
> I prefer to let the join operator to calculate a backward timestamp as
> state, if the cluster restarts, the consumer can use setStartFromTimestamp
> to start from that timestamp.
>
> Now my problem is, consumer can't read the state that join operator
> written, so I need a way to need small message (64bit long) from downstream
> to upstream. Redis may be a solution, but add external  dependency is a
> secondary option if I can pass this message through memory.
>
>
> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>
>> It would be good if you could elaborate a bit more on your use-case.
>> Are you using batch or streaming? What kind of "message" are we talking
>> about? Why are you thinking of using a static variable, instead of just
>> treating this message as part of the data(set/stream)?
>>
>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>
>> Currently I use Flink 1.9.1. The actual thing I want to do is send some
>> messages from downstream operators to upstream operators, which I consider
>> use static variable.
>>
>> But it makes me have to make sure in one taskmanager process it always
>> has these two operators, can I use CoLocationGroup to solve this problem?
>> Or can anyone give me an example to demostrate the usage of CoLocationGroup
>> ?
>>
>> Thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>>
>
> --
> Best regards
>
> Sili Liu
>


Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Matthias Pohl
Hi Fuyao,
for your first question about the different behavior depending on whether
you chain the methods or not: Keep in mind that you have to save the return
value of the assignTimestampsAndWatermarks method call if you don't chain
the methods together as it is also shown in [1].
At least the following example from your first message is indicating it:
```
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner()); (This is a deprecated method)
// instead of: retractStream =
retractStream.assignTimestampsAndWatermarks(new
BoRetractStreamTimestampAssigner());
retractStream
.keyBy()
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);
```

For your second question about setting the EventTime I'm going to pull in
Timo from the SDK team as I don't see an issue with your code right away.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies

On Wed, Nov 4, 2020 at 10:16 PM Fuyao Li  wrote:

> Hi Flink Users and Community,
>
> For the first part of the question, the 12 hour time difference is caused
> by a time extraction bug myself. I can get the time translated correctly
> now. The type cast problem does have some workarounds to solve it..
>
> My major blocker right now is the onTimer part is not properly triggered.
> I guess it is caused by failing to configure the correct watermarks &
> timestamp assigners. Please give me some insights.
>
> 1. If I don't chain the assignTimestampsAndWatermarks() method in together
> with keyedBy().. and process().. method. The context.timestamp() in my
> processElement() function will be null. Is this some expected behavior? The
> Flink examples didn't chain it together. (see example here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
> )
> 2. If I use registerEventTimeTimer() in processElement(). The onTimer
> method will not be triggered. However, I can trigger the onTimer method if
> I simply change it to registerProcessingTimeTimer(). I am using the
> settings below in the stream env.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000L);
>
> My code for method the process chain:
> retractStream
>
> .assignTimestampsAndWatermarks(WatermarkStrategy. Row>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
> .withTimestampAssigner((booleanRowTuple2,
> timestamp) -> {
> Row rowData = booleanRowTuple2.f1;
> LocalDateTime headerTime =
> (LocalDateTime)rowData.getField(3);
> LocalDateTime linesTime =
> (LocalDateTime)rowData.getField(7);
>
> LocalDateTime latestDBUpdateTime = null;
> if (headerTime != null && linesTime != null) {
> latestDBUpdateTime =
> headerTime.isAfter(linesTime) ? headerTime : linesTime;
> }
> else {
> latestDBUpdateTime = (headerTime != null)
> ? headerTime : linesTime;
> }
> if (latestDBUpdateTime != null) {
> return
> latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
> }
> // In the worst case, we use system time
> instead, which should never be reached.
> return System.currentTimeMillis();
> }))
> //  .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
> second way to create watermark, doesn't work
> .keyBy(value -> {
> // There could be null fields for header invoice_id
> field
> String invoice_id_key = (String)value.f1.getField(0);
> if (invoice_id_key == null) {
> invoice_id_key = (String)value.f1.getField(4);
> }
> return invoice_id_key;
> })
> .process(new TableOutputProcessFunction())
> .name("ProcessTableOutput")
> .uid("ProcessTableOutput")
> .addSink(businessObjectSink)
> .name("businessObjectSink")
>

Re: Batch compressed file output

2020-11-27 Thread Matthias Pohl
Hi Flavio,
others might have better ideas to solve this but I'll give it a try: Have
you considered extending FileOutputFormat to achieve what you need? That
approach (which is discussed in [1]) sounds like something you could do.
Another pointer I want to give is the DefaultRollingPolicy [2]. It looks
like it partially does what you're looking for. I'm adding Kostas to this
conversation as he worked on the RollingPolicy. Maybe, he has some more
insights.

I hope that helps.

Best,
Matthias

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/output-writer-td2296.html
[2]
https://github.com/apache/flink/blob/5ff96966b59e0d9a7b55ebae9e252b1c9aafd4ea/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java#L40

On Fri, Nov 27, 2020 at 11:07 AM Flavio Pompermaier 
wrote:

> Hello guys,
> I have to write my batch data (Dataset) to a file format. Actually
> what I need to do is:
>
>1. split the data if it exceeds some size threshold  (by line count or
>max MB)
>2. compress the output data (possibly without converting to the hadoop
>format)
>
> Are there any suggestions / recommendations about that?
>
> Best,
> Flavio
>


Re: Flink jobmanager TLS connectivity to Zookeeper

2020-12-10 Thread Matthias Pohl
Hi Azeem,
I haven't worked with Flink's SSL support, yet. But have you taken a look
at the SSL configuration options listed under [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/security/security-ssl.html#complete-list-of-ssl-options

On Tue, Dec 8, 2020 at 8:01 PM Azeem Mufti  wrote:

>  I'm trying to figure out a way to make Flink jobmanager (in HA) connect
> to zookeeper over SSL/TLS. It doesn't seem like there are native properties
> like Kafka has that support this interaction yet. Is this true or is there
> some way that I can go about doing this?
>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Accumulators storage path

2020-12-10 Thread Matthias Pohl
Hi Hanan,
thanks for reaching out to the Flink community. Have you considered
changing io.tmp.dirs [1][2]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#io-tmp-dirs
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/cluster_setup.html#configuring-flink

On Thu, Dec 10, 2020 at 10:24 AM Hanan Yehudai 
wrote:

> I am having all the Accumulators store their data on /tmp  - as this is
> the default.
> when running on docker -  this is mapped on my  VM’s   “/”  partition.
> a lot of accumulatos – cause low disk util   => pods are evicted.
>
>
>
> Is there a way to set the Accumulators persistence to a different path
> then /tmp ?
> – one that I can mount o ta different partition on my VM ?
>
>
>
> Flink 1.9 Is  used
>
>
>


Re: Putting record on kinesis (actually kinesalite) results with The security token included in the request is invalid.

2020-12-10 Thread Matthias Pohl
Hi Avi,
thanks for reaching out to the Flink community. I haven't worked with the
KinesisConsumer. Unfortenately, I cannot judge whether there's something
missing in your setup. But first of all: Could you confirm that the key
itself is valid? Did you try to use it in other cases?

Best,
Matthias

On Thu, Dec 10, 2020 at 12:48 PM Avi Levi  wrote:

> Hi ,
> Any help here will be greatly appreciated I am about to throw the towel, very 
> frustrating...
> I am trying to put record on kinesalite with the following configuration :
>
> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>  "true")
>   System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
> "true")
>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>   
> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>
>   val producerConfig = new Properties()
>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567";)
>   producerConfig.put( "VerifyCertificate", "false")
>
> However putting a record on the stream :
>
>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
> producerConfig)
>   producer.setFailOnError(true)
>   producer.setDefaultStream(outputStreamName)
>   producer.setDefaultPartition("0")
>
>   val kinesis =
> env.addSource(new FlinkKinesisConsumer[String](
>   inputStreamName,new SimpleStringSchema, consumerConfig))
>   .addSink(producer)
>
> yields:
>
> Exception name: UnrecognizedClientExceptionError message: The security token 
> included in the request is invalid.6 response headers:
> connection : close
> content-length : 107
> content-type : application/x-amz-json-1.1
>
>
> ➜  ~ cat ~/.aws/credentials
> [default]
> aws_access_key_id = x
> aws_secret_access_key = x
> region = us-east-1
>


Re: Putting record on kinesis (actually kinesalite) results with The security token included in the request is invalid.

2020-12-11 Thread Matthias Pohl
True, I got this wrong. Do you have any reason to assume that it's a Flink
issue? The configuration looks correct (relying on the Flink docs [1]
here). Have you considered asking in the AWS community for help?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer

On Thu, Dec 10, 2020 at 6:31 PM Avi Levi  wrote:

> Hi,
> Thanks for your reply, The problem is actually with the
> FlinkKinesisProducer and not the consumer ( i did consume from the stream
> successfully ). the keys are valid
>
> On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl 
> wrote:
>
>> Hi Avi,
>> thanks for reaching out to the Flink community. I haven't worked with the
>> KinesisConsumer. Unfortenately, I cannot judge whether there's something
>> missing in your setup. But first of all: Could you confirm that the key
>> itself is valid? Did you try to use it in other cases?
>>
>> Best,
>> Matthias
>>
>> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi  wrote:
>>
>>> Hi ,
>>> Any help here will be greatly appreciated I am about to throw the towel, 
>>> very frustrating...
>>> I am trying to put record on kinesalite with the following configuration :
>>>
>>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>  "true")
>>>   
>>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
>>> "true")
>>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>>   
>>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>>
>>>   val producerConfig = new Properties()
>>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>>> "http://localhost:4567";)
>>>   producerConfig.put( "VerifyCertificate", "false")
>>>
>>> However putting a record on the stream :
>>>
>>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>>> producerConfig)
>>>   producer.setFailOnError(true)
>>>   producer.setDefaultStream(outputStreamName)
>>>   producer.setDefaultPartition("0")
>>>
>>>   val kinesis =
>>> env.addSource(new FlinkKinesisConsumer[String](
>>>   inputStreamName,new SimpleStringSchema, consumerConfig))
>>>   .addSink(producer)
>>>
>>> yields:
>>>
>>> Exception name: UnrecognizedClientExceptionError message: The security 
>>> token included in the request is invalid.6 response headers:
>>> connection : close
>>> content-length : 107
>>> content-type : application/x-amz-json-1.1
>>>
>>>
>>> ➜  ~ cat ~/.aws/credentials
>>> [default]
>>> aws_access_key_id = x
>>> aws_secret_access_key = x
>>> region = us-east-1
>>>
>>


Re: Official Flink 1.12.0 image

2020-12-22 Thread Matthias Pohl
Hi Robert,
there is a discussion about it in FLINK-20632 [1]. PR #9249 [2] still needs
to get reviewed. You might want to follow that PR as Xintong suggested in
[1].
I hope that helps.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-20632
[2] https://github.com/docker-library/official-images/pull/9249

On Tue, Dec 22, 2020 at 3:45 PM Robert Cullen  wrote:

> Does anyone know when an official 1.12.0 image will be available on Docker
> Hub?
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Idea import Flink source code

2021-01-12 Thread Matthias Pohl
Hi,
you might want to move these kinds of questions into the
user@flink.apache.org which is the mailing list for community support
questions [1].
Coming back to your question: Is it just me or is the image not accessible?
Could you provide a textual description of your problem?

Best,
Matthias


[1] https://flink.apache.org/community.html#mailing-lists

On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:

> Hello,
> When importing the Flink source code into idea, the following error
> occurred.
> And several mirrors were configured in the setting file of maven, which
> did not solve the problem
>
>
>


Re: Re: Idea import Flink source code

2021-01-13 Thread Matthias Pohl
Don't forget to use the reply-all button when replying to threads on the
mailing lists. :-)

Have you tried building the project via command line using `mvn -DskipTests
-Dfast install` to pull all dependencies?
And just to verify: you didn't change the code, did you? We're talking
about the vanilla Flink source code...?

Matthias

On Wed, Jan 13, 2021 at 9:18 AM penguin.  wrote:

> Hi,
> Thank you for your reminding.
>  It seems that there is something wrong with putting the picture in the
> text.
>
> ▼Sync: at 2021/1/13 12:05 with 18 errors
>
> ▼Resolve dependencies 4 errors
>  Cannot resolve netminidev:json-smart:2.3
>  Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
>  Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
>  Cannot resolve com.nimbusds:lang-tag:1.5
> ▼Resolve plugins 14 errors
>  Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 
> 
>
> Best,
> penguin
>
>
>
>
> 在 2021-01-13 15:24:22,"Matthias Pohl"  写道:
>
> Hi,
> you might want to move these kinds of questions into the
> user@flink.apache.org which is the mailing list for community support
> questions [1].
> Coming back to your question: Is it just me or is the image not
> accessible? Could you provide a textual description of your problem?
>
> Best,
> Matthias
>
>
> [1] https://flink.apache.org/community.html#mailing-lists
>
> On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:
>
>> Hello,
>> When importing the Flink source code into idea, the following error
>> occurred.
>> And several mirrors were configured in the setting file of maven, which
>> did not solve the problem
>>
>>
>>
>
>
>


Re: Re: Re: Idea import Flink source code

2021-01-13 Thread Matthias Pohl
The mvn command helps to identify whether your issue is related to Maven
and/or missing dependencies or whether it's an Intellij problem. Usually,
running `mvn clean install -DskipTests -Dfast` isn't required to import the
Flink project into Intellij.

Best,
Matthias

PS: reply adds only the immediate responder to the recipient lists (as
happened in your first reply) vs reply-all would also automatically add the
ML email address(es) (and other thread participants) to the CC list.

On Wed, Jan 13, 2021 at 9:49 AM penguin.  wrote:

> Hi,
> I click the reply button every time... Does this mean that only the
> replied person can see the email?
>
>
> If Maven fails to download plugins or dependencies,  is mvn -clean
> install -DskipTests a must?
> I'll try first.
>
> penguin
>
>
>
>
> 在 2021-01-13 16:35:10,"Matthias Pohl"  写道:
>
> Don't forget to use the reply-all button when replying to threads on the
> mailing lists. :-)
>
> Have you tried building the project via command line using `mvn
> -DskipTests -Dfast install` to pull all dependencies?
> And just to verify: you didn't change the code, did you? We're talking
> about the vanilla Flink source code...?
>
> Matthias
>
> On Wed, Jan 13, 2021 at 9:18 AM penguin.  wrote:
>
>> Hi,
>> Thank you for your reminding.
>>  It seems that there is something wrong with putting the picture in the
>> text.
>>
>> ▼Sync: at 2021/1/13 12:05 with 18 errors
>>
>> ▼Resolve dependencies 4 errors
>>  Cannot resolve netminidev:json-smart:2.3
>>  Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
>>  Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
>>  Cannot resolve com.nimbusds:lang-tag:1.5
>> ▼Resolve plugins 14 errors
>>  Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 
>> 
>>
>> Best,
>> penguin
>>
>>
>>
>>
>> 在 2021-01-13 15:24:22,"Matthias Pohl"  写道:
>>
>> Hi,
>> you might want to move these kinds of questions into the
>> user@flink.apache.org which is the mailing list for community support
>> questions [1].
>> Coming back to your question: Is it just me or is the image not
>> accessible? Could you provide a textual description of your problem?
>>
>> Best,
>> Matthias
>>
>>
>> [1] https://flink.apache.org/community.html#mailing-lists
>>
>> On Wed, Jan 13, 2021 at 6:18 AM penguin.  wrote:
>>
>>> Hello,
>>> When importing the Flink source code into idea, the following error
>>> occurred.
>>> And several mirrors were configured in the setting file of maven, which
>>> did not solve the problem
>>>
>>>
>>>


Re: Flink Application cluster/standalone job: some JVM Options added to Program Arguments

2021-01-17 Thread Matthias Pohl
Hi Alexey,
thanks for reaching out to the Flink community. I'm not 100% sure whether
you have an actual issue or whether it's just the changed behavior you are
confused about. The change you're describing was introduced in Flink 1.12
as part of the work on FLIP-104 [1] exposing the actual memory usage
through the web UI.
Does this answer your question?

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-104%3A+Add+More+Metrics+to+Jobmanager

On Sat, Jan 16, 2021 at 5:35 AM Alexey Trenikhun  wrote:

> Hello,
> I was trying to deploy Flink 1.12.0 Application cluster on k8s, I have
> following job manager arguments:
> *standalone-job --job-classname
> com.x.App --job-id  @/opt/flink/conf/fsp.conf*
>
> However, when I print *args* from App.main():
>
> [@/opt/flink/conf/ssp.conf, -D,
> jobmanager.memory.off-heap.size=134217728b, -D,
> jobmanager.memory.jvm-overhead.min=280552338b, -D, 
> *jobmanager.memory.jvm-metaspace.size=268435456b,
> -D, jobmanager.memory.heap.size=2122317824b, -D,
> jobmanager.memory.jvm-overhead.max=280552338b**]*
>
> Looks like
> With Flink 1.11.3 same deployment worked as expected - the job was only
> getting [@/opt/flink/conf/ssp.conf]
>
> Thanks,
> Alexey
>


Re: Flink Application cluster/standalone job: some JVM Options added to Program Arguments

2021-01-19 Thread Matthias Pohl
You're right. Thinking about it and looking through the code, I agree: The
dynamic properties shouldn't be exposed in the main method. I was able to
reproduce the described behavior. I created FLINK-21024 covering this.

Thanks for reporting this issue, Alexey.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-21024

On Mon, Jan 18, 2021 at 7:11 PM Alexey Trenikhun  wrote:

> Hi Matthias,
> As I understand FLIP-104 is about UI. My question is about what user job
> is supposed to do with arguments like this: “-D
> jobmanager.memory.off-heap.size=134217728b“, set system property ? If
> user code has nothing to do with such arguments, why Flink append these
> arguments to user JOB args?
> Thanks,
> Alexey
>
>
> --
> *From:* Matthias Pohl 
> *Sent:* Sunday, January 17, 2021 11:53:29 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Flink Application cluster/standalone job: some JVM Options
> added to Program Arguments
>
> Hi Alexey,
> thanks for reaching out to the Flink community. I'm not 100% sure whether
> you have an actual issue or whether it's just the changed behavior you are
> confused about. The change you're describing was introduced in Flink 1.12
> as part of the work on FLIP-104 [1] exposing the actual memory usage
> through the web UI.
> Does this answer your question?
>
> Best,
> Matthias
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-104%3A+Add+More+Metrics+to+Jobmanager
>
> On Sat, Jan 16, 2021 at 5:35 AM Alexey Trenikhun  wrote:
>
> Hello,
> I was trying to deploy Flink 1.12.0 Application cluster on k8s, I have
> following job manager arguments:
> *standalone-job --job-classname
> com.x.App --job-id  @/opt/flink/conf/fsp.conf*
>
> However, when I print *args* from App.main():
>
> [@/opt/flink/conf/ssp.conf, -D,
> jobmanager.memory.off-heap.size=134217728b, -D,
> jobmanager.memory.jvm-overhead.min=280552338b, -D, 
> *jobmanager.memory.jvm-metaspace.size=268435456b,
> -D, jobmanager.memory.heap.size=2122317824b, -D,
> jobmanager.memory.jvm-overhead.max=280552338b**]*
>
> Looks like
> With Flink 1.11.3 same deployment worked as expected - the job was only
> getting [@/opt/flink/conf/ssp.conf]
>
> Thanks,
> Alexey
>
>
>


Re: Question about setNestedFileEnumeration()

2021-01-22 Thread Matthias Pohl
Hi Wayne,
based on other mailing list discussion ([1]) you can assume that the
combination of FileProcessingMode.PROCESS_CONTINUOUSLY and setting
FileInputFormat.setNestedFileEnumeration to true should work as you expect
it to work.

Can you provide more context on your issue like log files? Which Flink
version are you using? Have you tried checking whether this also
applies when accessing a local directory?

Best,
Matthias

[1]
https://lists.apache.org/thread.html/86a23b4c44d92c3adeb9ff4a708365fe4099796fb32deb6319e0e17f%40%3Cuser.flink.apache.org%3E

On Fri, Jan 22, 2021 at 2:32 AM Billy Bain  wrote:

> I sent this a little prematurely. Will the streaming process find new
> directories under the parent?
>
> If the input path is
> s3://foo.bar/
> and directories are added daily, should I expect that the newly added
> directories+files will get processed?
>
> Thanks!
>
> Wayne
>
> On 2021/01/21 23:20:41, Billy Bain  wrote:
> > I have a Streaming process where new directories are added daily in S3.>
> >
> > s3://foo/bar/2021-01-18/data.gz>
> > s3://foo/bar/2021-01-19/data.gz>
> > s3://foo/bar/2021-01-20/data.gz>
> >
> > If I start the process, it won't pick up anything other than the>
> > directories visible when the process was started.>
> >
> > The textInput has this applied:>
> > textInputFormat.setNestedFileEnumeration(true);>
> >
> > DataStreamSource lines = env.readFile(textInputFormat,
> inputPath,>
> > FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);>
> > -- >
> > Wayne D. Young>
> > aka Billy Bob Bain>
> > billybobb...@gmail.com>
> >
>


Re: JDBC connection pools

2021-01-22 Thread Matthias Pohl
Hi Marco,
have you had a look into the connector documentation ([1] for the regular
connector or [2] for the SQL connector)? Maybe, discussions about
connection pooling in [3] and [4] or the code snippets provided in the
JavaDoc of JdbcInputFormat [5] help as well.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-handle-JDBC-connections-in-a-topology-td29004.html
[4]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Should-I-use-static-database-connection-pool-td30537.html
[5]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L55

On Thu, Jan 21, 2021 at 8:19 PM Marco Villalobos 
wrote:

> Currently, my jobs that require JDBC initialize a connection in the open
> method directly via JDBC driver.
>
> 1. What are the established best practices for this?
> 2. Is it better to use a connection pool that can validate the connection
> and reconnect?
> 3. Would each operator require its own connection pool?
>
> I'd like the communities thought on this topic.
>


Re: Handling validations/errors in the flink job

2021-01-22 Thread Matthias Pohl
Hi Sagar,
have you had a look at CoProcessFunction [1]? CoProcessFunction enables you
to join two streams into one and also provide context to use SideOutput [2].

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
[2]
https://ci.apache.org/projects/flink/flink-docs-master/learn-flink/event_driven.html#side-outputs

On Wed, Jan 20, 2021 at 4:51 PM sagar  wrote:

> Hi Team,
>
>
> I am creating a flink job with DataStream API and batch mode.
>
> It is having 5 different bounded sources and I need to perform some
> business operations on it like joining , aggregating etc.
>
>
>
> I am using a CoGroup operator to join two streams as it serves as a left
> join. So when keys are present in both the stream, I am processing and
> moving ahead.
>
> But when there is only one key present I need to send it as an error.
>
>
>
> Some operators like Process have side output features, but CoGroup doesn't
> have that feature.
>
>
>
> In order to report missing data to different stream, I am planning to
> create one common error handling stream and at each CoGroup operation I am
> planning to write it to error stream by using Split operator after CoGroup
>
>
>
> Let me know if that is the correct way of handling the errors?
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not the
> intended recipient please ignore this email.
>


Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-22 Thread Matthias Pohl
Hi Smile,
Have you used a clean checkout? I second Robert's statement considering
that the dependency you're talking about is already part
of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It
also has the correct scope set both in master and release-1.12.

Best,
Matthias

On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers 
wrote:

> Yes, I've tried from both the root directory and the sub module. Neither
> or them works. And the error messages are the same.
>
> At 2021-01-21 23:22:12, "Robert Metzger"  wrote:
>
> Since our CI system is able to build Flink, I believe it's a local issue.
>
> Are you sure that the build is failing when you build Flink from the root
> directory (not calling maven from within a maven module?)
>
> On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers 
> wrote:
>
>> Hi,
>> I got an error when tried to compile & package Flink (version 1.12 &
>> current master).
>> It can be reproduced by run 'mvn clean test' under
>> flink-end-to-end-tests/flink-end-to-end-tests-common-kafka.
>>
>> It seems that a necessary dependency for test scope was missing and some
>> classes can not be found.
>> After adding the dependency kafka-avro-serializer to the pom of
>> flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes
>> well.
>>
>> And I just wonder that is this a bug or I missed some local setting?
>>
>> Best regards.
>> Smile
>>
>> --
>> Error logs attached:
>>
>>
>> [INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka
>> >
>> [INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT
>> [INFO] [ jar
>> ]-
>> Downloading ...
>> [INFO]
>> [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO] Deleting
>> /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target
>> [INFO]
>> [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version)
>> @ flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml)
>> @ flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO] Using plugin classloader, includes GMavenPlus classpath.
>> [INFO] Using Groovy 2.5.12 to perform execute.
>> includes: org.apache.flink.tests.util.categories.Dummy
>> excludes:
>> [INFO]
>> [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO] Highest basedir set to:
>> /Users/smile/Downloads/W/code/flink/apache/master/flink
>> [INFO]
>> [INFO] --- maven-remote-resources-plugin:1.5:process
>> (process-resource-bundles) @ flink-end-to-end-tests-common-kafka ---
>> [INFO]
>> [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] skip non existing resourceDirectory
>> /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources
>> [INFO] Copying 3 resources
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @
>> flink-end-to-end-tests-common-kafka ---
>> [INFO] Compiling 5 source files to
>> /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/classes
>> [INFO]
>> /Users/smile/Downloads/W/code

Re: Flink 1.11 checkpoint compatibility issue

2021-01-22 Thread Matthias Pohl
Hi Lu,
thanks for reaching out to the community, Lu. Interesting observation.
There's no change between 1.9.1 and 1.11 that could explain this behavior
as far as I can tell. Have you had a chance to debug the code? Can you
provide the code so that we could look into it more closely?
Another thing: Are you using the TableAPI in your job? There might be some
problems with setting the maxParallelism in the TableAPI.

Keep in mind that you could use the State Processor API [1] to adjust the
maxParallelism per Operator in a Savepoint.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#modifying-savepoints

On Fri, Jan 22, 2021 at 12:49 AM Lu Niu  wrote:

> Hi,
>
> We recently migrated from 1.9.1 to flink 1.11 and notice the new job
> cannot consume from savepoint taken in 1.9.1. Here is the list of operator
> id and max parallelism of savepoints taken in both versions. The only code
> change is version upgrade.
>
> savepoint 1.9.1:
> ```
> Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
> Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
> Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
> Id: d003b5c018424b83b771743563711891, maxparallsim: 900
> Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
> Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
> Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
> Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
> Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
> Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
> Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
> ```
>
> savepoint 1.11:
> ```
> Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
> Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
> Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
> Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
> Id: d003b5c018424b83b771743563711891, maxparallsim: 900
> Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
> Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
> Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
> Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
> Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
> Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
> Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
> ```
>
> In the code we use env.setMaxParallsim(900). it is strange that savepoint
> 1.9.1 has different max parallelism for different operators and we don't
> know where 1024 and 4096 come from. Here I want to ask the community is it
> possible these are set by flink itself?
>
> Best
> Lu
>


Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Matthias Pohl
Hi Sebastián,
have you tried changing the dependency scope to provided
for flink-table-planner-blink as it is suggested in [1]?

Best,
Matthias

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html

On Fri, Jan 22, 2021 at 4:04 PM Sebastián Magrí 
wrote:

> Hi!
>
> I'm trying out Flink SQL with the attached docker-compose file.
>
> It starts up and then I create a table with the following statement:
>
> CREATE TABLE mytable_simple (
>   `customer_id` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://pgusr:pgpwd@postgres/pdgb',
>   'table-name' = 'mytable'
> );
>
> However when I try to run this:
>
> select * from mytable_simple;
>
> I get the following error in the client:
>
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot
> be cast to org.codehaus.commons.compiler.ICompilerFactory
>
> At first I thought it could be an incompatibility issue with the libraries
> I was putting in, like the postgres-cdc library version, but even after
> leaving only the JDBC libraries in I still get the same error.
>
> It'd be great if you could give me some pointers here.
>
> Thanks!
>
> --
> Sebastián Ramírez Magrí
>


Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-01-22 Thread Matthias Pohl
Ok, to be fair, I just did some research on the error message and didn't
realize that you're working with binaries only.

I tried to set it up on my machine to be able to reproduce your error.
Unfortunately, I wasn't able to establish the connection between Flink and
Postgres using your docker-compose.yml.
I'm going to cc Timo. Maybe, he has a guess what's causing this error.

Best,
Matthias

On Fri, Jan 22, 2021 at 4:35 PM Sebastián Magrí 
wrote:

> Hi Matthias!
>
> I went through that thread but as I'm just using the `apache/flink` docker
> image for testing I honestly couldn't figure out how I would do that since
> I don't have a pom file to edit. If it's possible to do it through the
> configuration I'd be glad if you could point me out in the right direction.
>
> Pretty evident I don't have a lot of experience with mvn or "modern" Java
> in general.
>
> :-)
>
> Thanks!
>
> On Fri, 22 Jan 2021 at 15:19, Matthias Pohl 
> wrote:
>
>> Hi Sebastián,
>> have you tried changing the dependency scope to provided
>> for flink-table-planner-blink as it is suggested in [1]?
>>
>> Best,
>> Matthias
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-10-exception-Unable-to-instantiate-java-compiler-td38221.html
>>
>> On Fri, Jan 22, 2021 at 4:04 PM Sebastián Magrí 
>> wrote:
>>
>>> Hi!
>>>
>>> I'm trying out Flink SQL with the attached docker-compose file.
>>>
>>> It starts up and then I create a table with the following statement:
>>>
>>> CREATE TABLE mytable_simple (
>>>   `customer_id` INT
>>> ) WITH (
>>>   'connector' = 'jdbc',
>>>   'url' = 'jdbc:postgresql://pgusr:pgpwd@postgres/pdgb',
>>>   'table-name' = 'mytable'
>>> );
>>>
>>> However when I try to run this:
>>>
>>> select * from mytable_simple;
>>>
>>> I get the following error in the client:
>>>
>>> [ERROR] Could not execute SQL statement. Reason:
>>> java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot
>>> be cast to org.codehaus.commons.compiler.ICompilerFactory
>>>
>>> At first I thought it could be an incompatibility issue with the
>>> libraries I was putting in, like the postgres-cdc library version, but even
>>> after leaving only the JDBC libraries in I still get the same error.
>>>
>>> It'd be great if you could give me some pointers here.
>>>
>>> Thanks!
>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>
>
> --
> Sebastián Ramírez Magrí
>

Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: unsubscribe

2021-01-24 Thread Matthias Pohl
Hi Abhishek,
unsubscribing works by sending an email to user-unsubscr...@flink.apache.org
as stated in [1].

Best,
Matthias

[1] https://flink.apache.org/community.html#mailing-lists

On Sun, Jan 24, 2021 at 3:06 PM Abhishek Jain  wrote:

> unsubscribe
>


Re: FlinkSQL submit query and then the jobmanager failed.

2021-01-24 Thread Matthias Pohl
Hi,
thanks for reaching out to the community. I'm not an Hive nor Orc format
expert. But could it be that this is a configuration problem? The error is
caused by an ArrayIndexOutOfBounds exception in
ValidReadTxnList.readFromString on an array generated by splitting a String
using colons as separators [1]. This method processes the value of the
configuration parameter hive.txn.valid.txns. Could it be that this
parameter is defined but not properly set (a value having no colon included
at all might cause this exception for instance)? Or is this parameter not
set by the user him-/herself?

Best,
Matthias

[1]
https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java#L190-L192

On Sun, Jan 24, 2021 at 2:48 PM 赵一旦  wrote:

> As the title, my query sql is very simple, it just select all columns from
> a hive table(version 1.2.1; orc format).  When the sql is submitted, after
> several seconds, the jobmanager is failed. Here is the Jobmanager's log.
> Does anyone can help to this problem?
>
> 2021-01-24 04:41:24,952 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'flink-akka.actor.default-dispatcher-2' produced an uncaught 
> exception. Stopping the process...
>
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
> coordinators
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722) 
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) 
> ~[?:1.8.0_251]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:

Re: Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-25 Thread Matthias Pohl
Hi Smile,
you missed installing the pom provided by mvnrepository.org [1]. Maven will
install a basic pom if none is provided [2]. This basic pom file will not
include any dependencies. You should be able to fix your problem by running
your command above but adding the -DpomFile property with the pom file
provided in [1]:

mvn install:install-file -DgroupId=io.confluent
-DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar
-Dfile=kafka-avro-serializer-5.5.2.jar
-DpomFile=kafka-avro-serializer-5.5.2.pom

[1]
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom
[2]
https://maven.apache.org/plugins/maven-install-plugin/install-file-mojo.html#pomFile

On Mon, Jan 25, 2021 at 8:25 AM Smile@LETTers  wrote:

> Hi Matthias,
> Sorry for my miss leading. I mean kafka-schema-serializer rather than
> kafka-avro-serializer.
>
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe is in
> kafka-schema-serializer and kafka-schema-serializer should be a dependency
> of kafka-avro-serializer according to their pom.xml files(see [1], [2]).
> I couldn't resolve a valid kafka-avro-serializer.jar in my mirror so I
> downloaded it manually from [3] and installed it using:
> mvn install:install-file -DgroupId=io.confluent
> -DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar
> -Dfile=kafka-avro-serializer-5.5.2.jar
> After that, I tried to build Flink and got the above exceptions. Then I
> tried to add the dependency of kafka-schema-serializer to
> flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml (also
> manually installed it to my local maven repo) and everything went well.
> I also tried to remove it from the pom.xml after installing, and the
> exception came back.
> Maybe there was something wrong with the manually-installed
> kafka-avro-serializer?
>
>
> [1].
> https://mvnrepository.com/artifact/io.confluent/kafka-schema-serializer/usages
> [2].
> https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom
> [3]. https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer
>
> At 2021-01-22 21:22:51, "Matthias Pohl"  wrote:
>
> Hi Smile,
> Have you used a clean checkout? I second Robert's statement considering
> that the dependency you're talking about is already part
> of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It
> also has the correct scope set both in master and release-1.12.
>
> Best,
> Matthias
>
> On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers 
> wrote:
>
>> Yes, I've tried from both the root directory and the sub module. Neither
>> or them works. And the error messages are the same.
>>
>> At 2021-01-21 23:22:12, "Robert Metzger"  wrote:
>>
>> Since our CI system is able to build Flink, I believe it's a local issue.
>>
>> Are you sure that the build is failing when you build Flink from the root
>> directory (not calling maven from within a maven module?)
>>
>> On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers 
>> wrote:
>>
>>> Hi,
>>> I got an error when tried to compile & package Flink (version 1.12 &
>>> current master).
>>> It can be reproduced by run 'mvn clean test' under
>>> flink-end-to-end-tests/flink-end-to-end-tests-common-kafka.
>>>
>>> It seems that a necessary dependency for test scope was missing and some
>>> classes can not be found.
>>> After adding the dependency kafka-avro-serializer to the pom of
>>> flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes
>>> well.
>>>
>>> And I just wonder that is this a bug or I missed some local setting?
>>>
>>> Best regards.
>>> Smile
>>>
>>> --
>>> Error logs attached:
>>>
>>>
>>> [INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka
>>> >
>>> [INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT
>>> [INFO] [ jar
>>> ]-
>>> Downloading ...
>>> [INFO]
>>> [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @
>>> flink-end-to-end-tests-common-kafka ---
>>> [INFO] Deleting
>>> /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target
>>> [INFO]
>>> [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @
>>> flink-end-to-end-tests-common-kafka ---
>>> [INFO]
>>> [INFO]

Re: memory tuning

2021-01-26 Thread Matthias Pohl
Hi Marco,
Could you share the preconfiguration logs? They are printed in the
beginning of the taskmanagers' logs and contain a summary of the used
memory configuration?

Best,
Matthias

On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos 
wrote:

>
> I have a flink job that collects and aggregates time-series data from many
> devices into one object (let's call that X) that was collected by a window.
>
> X contains time-series data, so it contains many String, Instant, a
> HashMap, and another type (Let's call Y) objects.
>
> When I collect 4 X instances, and it contains 80 Y instances, that
> equates to approximately 172 MB of data.
>
> That should be okay, because my machine has 32 GB ram, and I allocated 1.5
> GB to each task manager.
>
> However, it fails due to out of memory errors, and I think it happens
> during serialization. I am not sure if that's a coincidence or fact.
>
> I am using RocksDB state backend, as well Kryo serialization.
>
> I am already refactoring my code from Processing Time semantics to Event
> Time semantics, and I am trying to store smaller sized objects in keyed
> state, rather than this large object, but in the meantime, our machines
> have plenty of memory. What can I do to fix this?
>
> SAMPLE STACK TRACE
>
> 2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task
>  [] - enrich information related to tag metadata to sensor
> time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING
> to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [965ae4d.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [965ae4d.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> ... 12 more
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> 2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task
>  [] - Triggering cancellation of task code enrich
> information related to tag metadata to sensor time series (2/2)
> (b6ba76c4cc6bc7fdbd2c69332442742d).
> 2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task
>  [] - aggregate daily average window function (2/2)
> (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown
> Source) ~[?:?]
> at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_252]
> at
> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
> ~[965ae4d.jar:?]
> at com.esotericsoftware.kryo.Kryo.newInstan

Re: JobManager seems to be leaking temporary jar files

2021-01-26 Thread Matthias Pohl
Hi Maciek,
my understanding is that the jars in the JobManager should be cleaned up
after the job is terminated (I assume that your jobs successfully
finished). The jars are managed by the BlobService. The dispatcher will
trigger the jobCleanup in [1] after job termination. Are there any
suspicious log messages that might indicate an issue?
I'm adding Chesnay to this thread as he might have more insights here.

[1]
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797

On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak  wrote:

> Hello,
>
> in our setup we have:
>
> - Flink 1.11.2
>
> - job submission via REST API (first we upload jar, then we submit
> multiple jobs with it)
>
> - additional jars embedded in lib directory of main jar (this is crucial
> part)
>
> When we submit jobs this way, Flink creates new temp jar files via
> PackagedProgram.extractContainedLibraries method.
>
> We observe that they are not removed after job finishes - it seems that
> PackagedProgram.deleteExtractedLibraries is not invoked when using REST
> API.
>
> What's more, it seems that those jars remain open in JobManager process.
> We observe that when we delete them manually via scripts, the disk space
> is not reclaimed until process is restarted, we also see via heap dump
> inspection that java.util.zip.ZipFile$Source  objects remain, pointing
> to those files. This is quite a problem for us, as we submit quite a few
> jobs, and after a while we ran out of either heap or disk space on
> JobManager process/host. Unfortunately, I cannot so far find where this
> leak would happen...
>
> Does anybody have some pointers where we can search? Or how to fix this
> behaviour?
>
>
> thanks,
>
> maciek
>
>


Re: memory tuning

2021-01-27 Thread Matthias Pohl
c.port, 6123
> 2021-01-25 21:41:18,046 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-01-25 21:41:18,046 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-01-25 21:41:18,046 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2021-01-25 21:41:18,046 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 1
> 2021-01-25 21:41:18,046 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2021-01-25 21:41:18,047 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: s3.endpoint, http://localhost:4566
> 2021-01-25 21:41:18,048 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: s3.path.style.access, true
> 2021-01-25 21:41:18,048 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 4g
>
>
> On Tue, Jan 26, 2021 at 12:34 AM Matthias Pohl 
> wrote:
>
>> Hi Marco,
>> Could you share the preconfiguration logs? They are printed in the
>> beginning of the taskmanagers' logs and contain a summary of the used
>> memory configuration?
>>
>> Best,
>> Matthias
>>
>> On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>>
>>> I have a flink job that collects and aggregates time-series data from
>>> many devices into one object (let's call that X) that was collected by a
>>> window.
>>>
>>> X contains time-series data, so it contains many String, Instant, a
>>> HashMap, and another type (Let's call Y) objects.
>>>
>>> When I collect 4 X instances, and it contains 80 Y instances, that
>>> equates to approximately 172 MB of data.
>>>
>>> That should be okay, because my machine has 32 GB ram, and I allocated
>>> 1.5 GB to each task manager.
>>>
>>> However, it fails due to out of memory errors, and I think it happens
>>> during serialization. I am not sure if that's a coincidence or fact.
>>>
>>> I am using RocksDB state backend, as well Kryo serialization.
>>>
>>> I am already refactoring my code from Processing Time semantics to Event
>>> Time semantics, and I am trying to store smaller sized objects in keyed
>>> state, rather than this large object, but in the meantime, our machines
>>> have plenty of memory. What can I do to fix this?
>>>
>>> SAMPLE STACK TRACE
>>>
>>> 2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task
>>>[] - enrich information related to tag metadata to
>>> sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from
>>> RUNNING to FAILED.
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
>>> [flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.stre

Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-02-01 Thread Matthias Pohl
Yes, thanks for taking over the release!

Best,
Matthias

On Mon, Feb 1, 2021 at 5:04 AM Zhu Zhu  wrote:

> Thanks Xintong for being the release manager and everyone who helped with
> the release!
>
> Cheers,
> Zhu
>
> Dian Fu  于2021年1月29日周五 下午5:56写道:
>
>> Thanks Xintong for driving this release!
>>
>> Regards,
>> Dian
>>
>> 在 2021年1月29日,下午5:24,Till Rohrmann  写道:
>>
>> Thanks Xintong for being our release manager. Well done!
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 29, 2021 at 9:50 AM Yang Wang  wrote:
>>
>>> Thanks Xintong for driving this release.
>>>
>>> Best,
>>> Yang
>>>
>>> Yu Li  于2021年1月29日周五 下午3:52写道:
>>>
>>>> Thanks Xintong for being our release manager and everyone else who made
>>>> the release possible!
>>>>
>>>> Best Regards,
>>>> Yu
>>>>
>>>>
>>>> On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:
>>>>
>>>>> The Apache Flink community is very happy to announce the release of
>>>>> Apache
>>>>> Flink 1.10.3, which is the third bugfix release for the Apache Flink
>>>>> 1.10
>>>>> series.
>>>>>
>>>>> Apache Flink® is an open-source stream processing framework for
>>>>> distributed, high-performing, always-available, and accurate data
>>>>> streaming
>>>>> applications.
>>>>>
>>>>> The release is available for download at:
>>>>> https://flink.apache.org/downloads.html
>>>>>
>>>>> Please check out the release blog post for an overview of the
>>>>> improvements
>>>>> for this bugfix release:
>>>>> https://flink.apache.org/news/2021/01/29/release-1.10.3.html
>>>>>
>>>>> The full release notes are available in Jira:
>>>>>
>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348668
>>>>>
>>>>> We would like to thank all contributors of the Apache Flink community
>>>>> who
>>>>> made this release possible!
>>>>>
>>>>> Regards,
>>>>> Xintong Song
>>>>>
>>>>


Re: S3 parquet files as Sink in the Table SQL API

2021-02-10 Thread Matthias Pohl
Hi,
have tried using the bundled hadoop uber jar [1]. It looks like some Hadoop
dependencies are missing.

Best,
Matthias

[1] https://flink.apache.org/downloads.html#additional-components

On Wed, Feb 10, 2021 at 1:24 PM meneldor  wrote:

> Hello,
> I am using PyFlink and I want to write records from the table sql api as
> parquet files on AWS S3. I followed the documentations but it seems that
> I'm missing some  dependencies or/and configuration. Here is the SQL:
>
>> CREATE TABLE sink_table(
>> `id` VARCHAR,
>> `type` VARCHAR,
>> `machn` VARCHAR,
>> `lastacct_id` BIGINT,
>> `upd_ts` BIGINT
>> ) WITH (
>>   'connector' = 'filesystem',
>>  'path' = 's3a://my-bucket/flink_sink',
>>  'format' = 'parquet'
>> )
>>
>> This is the configuration in flink-conf.yaml:
>
> s3.endpoint: https://s3.us-west-1.amazonaws.com
>> s3.path.style.access: true
>> s3.access-key: ***KEY-STRING***
>> s3.secret-key: ***KEY-SECRET-STRING***
>> s3.entropy.key: _entropy_
>> s3.entropy.length: 8
>> hadoop.s3.socket-timeout: 10m
>>
> I downloaded flink-s3-fs-hadoop-1.12.1.jar and
> flink-hadoop-compatibility_2.11-1.12.1.jar in plugins/ and
> flink-sql-parquet_2.11-1.12.1.jar in lib/
>
> Here is the exception:
>
>> Traceback (most recent call last):
>>   File "s3_sink.py", line 101, in 
>> """)
>>   File
>> "/home/user/miniconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
>> line 766, in execute_sql
>> return TableResult(self._j_tenv.executeSql(stmt))
>>   File
>> "/home/user/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/home/user/miniconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "/home/user/miniconda3/lib/python3.7/site-packages/py4j/protocol.py", line
>> 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o14.executeSql.
>> : java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
>>
> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:115)
>
> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
>
> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory$2.createRuntimeEncoder(ParquetFileFormatFactory.java:103)
>
> at
>> org.apache.flink.formats.parquet.ParquetFileFormatFactory$2.createRuntimeEncoder(ParquetFileFormatFactory.java:97)
>
> at
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>
> at
>> org.apache.flink.table.filesystem.FileSystemTableSink.createStreamingSink(FileSystemTableSink.java:183)
>
> at
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:145)
>
> at
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>
> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>
> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:103)
>
> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>
> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>
> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>
> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>
> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.sc

Re: Flink’s Kubernetes HA services - NOT working

2021-02-10 Thread Matthias Pohl
Hi Daniel,
what's the exact configuration you used? Did you use the resource
definitions provided in the Standalone Flink on Kubernetes docs [1]? Did
you do certain things differently in comparison to the documentation?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix

On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled 
wrote:

>
> ,Hey
>
> We are using standalone flink on kubernetes
> :"And we have followed the instructions in the following link "Kubernetes
> HA Services
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
> .We were unable to make it work
> .We are facing a lot of problems
> For example some of the jobs don't start complaining that there are not
> enough slots available - although there are enough slots  and it seems as
> the job manager is NOT aware of all the task managers
> .In other scenario we were unable to run any job at all
>  The flink dashboard is unresponsive and we get the error
> "flink service temporarily unavailable due to an ongoing leader election.
> please refresh"
> .We believe we are missing some configurations
>  ?Are there any more detailed instructions
> ?And suggestions/tips
>  .Attached is the log of the job manager in one of the attempts
>
> Please give me some advice.
> BR,
> Danny
>
>


Re: How does Flink handle shorted lived keyed streams

2021-02-10 Thread Matthias Pohl
Hi narashima,
not sure whether this fits your use case, but have you considered creating
a savepoint and analyzing it using the State Processor API [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#state-processor-api

On Wed, Feb 10, 2021 at 6:08 PM narasimha  wrote:

> It is not solving the problem.
>
> I could see the memory keep increasing, resulting in a lot of high GCs.
>
> There could be a memory leak, just want to know how to know if older keps
> are skill alive, even after the pattern has been satisfied or within range
> of the pattern has expired.
>
> Can someone suggest how to proceed further.
>


Re: Should flink job manager crash during zookeeper upgrade?

2021-02-10 Thread Matthias Pohl
Hi Barisa,
thanks for sharing this. I'm gonna add Till to this thread. He might have
some insights.

Best,
Matthias

On Wed, Feb 10, 2021 at 4:19 PM Barisa Obradovic  wrote:

> I'm trying to understand if behaviour of the flink jobmanager during
> zookeeper upgrade is expected or not.
>
> I'm running flink 1.11.2 in kubernetes, with zookeeper server 3.5.4-beta.
> While I'm doing zookeeper upgrade, there is a 20 seconds zookeeper
> downtime.
> I'd expect to either flink job to restart or few warnings in the logs
> during
> those 20 seconds. Instead, I see whole flink JVM crash ( and later the pod
> restart).
>
> I expected for flink to internally retry zookeeper requests, so I'm
> surprised it crashes. Is this expected, or is it a bug?
>
> From the logs
>
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> [09-Feb-2021 11:30:00.197 UTC] INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Opening socket connection to server zdzk.servicexxx/192.168.190.92:2181
> [09-Feb-2021 11:30:00.197 UTC] INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Socket connection established to zdzk.servicexxx/192.168.190.92:2181,
> initiating session
> [09-Feb-2021 11:30:00.198 UTC] WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Session 0x3012b0057140004 for server zdzk.servicexxx/192.168.190.92:2181,
> unexpected error, closing socket connection and attempting reconnect
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_192]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_192]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_192]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_192]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> ~[?:1.8.0_192]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> [09-Feb-2021 11:30:02.294 UTC] INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Opening socket connection to server zdzk.servicexxx/192.168.190.92:2181
> [09-Feb-2021 11:30:02.295 UTC] INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Socket connection established to zdzk.servicexxx/192.168.190.92:2181,
> initiating session
> [09-Feb-2021 11:30:02.295 UTC] WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Session 0x3012b0057140004 for server zdzk.servicexxx/192.168.190.92:2181,
> unexpected error, closing socket connection and attempting reconnect
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_192]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_192]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_192]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_192]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> ~[?:1.8.0_192]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> at
>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [flink-shaded-zookeeper-3.4.14.jar:3.4.14-11.0]
> [09-Feb-2021 11:30:03.841 UTC] INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> Opening socket connection to server zdz

Re: Flink’s Kubernetes HA services - NOT working

2021-02-11 Thread Matthias Pohl
One other thing: It looks like you've set high-availability.storageDir to a
local path file:///opt/flink/recovery. You should use a storage path that
is accessible from all Flink cluster components (e.g. using S3). Only
references are stored in Kubernetes ConfigMaps [1].

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration

On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl 
wrote:

> Hi Daniel,
> what's the exact configuration you used? Did you use the resource
> definitions provided in the Standalone Flink on Kubernetes docs [1]? Did
> you do certain things differently in comparison to the documentation?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>
> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled 
> wrote:
>
>>
>> ,Hey
>>
>> We are using standalone flink on kubernetes
>> :"And we have followed the instructions in the following link "Kubernetes
>> HA Services
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>> .We were unable to make it work
>> .We are facing a lot of problems
>> For example some of the jobs don't start complaining that there are not
>> enough slots available - although there are enough slots  and it seems as
>> the job manager is NOT aware of all the task managers
>> .In other scenario we were unable to run any job at all
>>  The flink dashboard is unresponsive and we get the error
>> "flink service temporarily unavailable due to an ongoing leader election.
>> please refresh"
>> .We believe we are missing some configurations
>>  ?Are there any more detailed instructions
>> ?And suggestions/tips
>>  .Attached is the log of the job manager in one of the attempts
>>
>> Please give me some advice.
>> BR,
>> Danny
>>
>>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: threading and distribution

2021-02-11 Thread Matthias Pohl
Hi Marco,
sorry for the late reply. The documentation you found [1] is already a good
start. You can define how many subtasks of an operator run in parallel
using the operator's parallelism configuration [2]. Each operator's subtask
will run in a separate task slot. There's the concept of slot sharing as
described in [3] which enables Flink to run subtasks of different operators
of the same job in the same slot. This enables the TaskManager to run an
entire pipeline in a single slot [3].
The maximum parallelism of your job is bound by the number of available
task slots in the Flink cluster which can be defined through the number of
slots per TaskManager [4][5] and the number of TaskManagers running in your
Flink cluster (taskmanager.numberOfTaskSlots * #taskmanagers = maximum
possible parallelism for an operator/pipeline).

I hope this was still helpful.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#:~:text=A%20Flink%20program%20consists%20of,task%20is%20called%20its%20parallelism
.
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/flink-architecture.html#task-slots-and-resources
[4]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html
[5]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-numberoftaskslots

On Fri, Feb 5, 2021 at 12:22 PM Marco Villalobos 
wrote:

> Okay, I am following up to my question. I see information regarding the
> threading and distribution model on the documentation about the
> architecture.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html
>
> Next, I want to read up on what I have control over.
>
> On Fri, Feb 5, 2021 at 3:06 AM Marco Villalobos 
> wrote:
>
>> as data flows from a source through a pipeline of operators and finally
>> sinks, is there a means to control how many threads are used within an
>> operator, and how an operator is distributed across the network?
>>
>> Where can I read up on these types of details specifically?
>>
>


Re: Question

2021-02-12 Thread Matthias Pohl
Hi Abu Bakar Siddiqur Rahman,
Have you had a look at the Flink documentation [1]? It provides
step-by-step instructions on how to run a job (the Flink binaries provide
example jobs under ./examples) using a local standalone cluster. This
should also work on a Mac. You would just need to start the Flink cluster
(./bin/start-cluster.sh) and submit a job using one of the example jars
provided in the binaries (e.g. ./bin/flink run -d
./examples/streaming/WordCount.jar). You can check the job running in
Flink's web UI being available under http://localhost:8081 if you use the
default configuration provided by the Flink binaries.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html

On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi,
>
> Is there anyone who can inform me how I can connect a Java program to
> Apache Flink (in mac)?
>
> Thank you!
>
> Regards,
> Abu Bakar Siddiqur Rahman
>
> On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <
> bakar121...@gmail.com> wrote:
>
>> Hi Chesnay,
>>
>> Could you please inform me that how can I connect a Java program to
>> apache Flink (in mac)?
>>
>> Thank you!
>>
>> Regards,
>> Abu Bakar Siddiqur Rahman
>>
>> On Wed, Feb 3, 2021, 3:06 AM Chesnay Schepler  wrote:
>>
>>> Sure.
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper
>>>
>>> On 2/3/2021 3:08 AM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>
>>> Hi,
>>>
>>> Is there any source code for the checkpoints, snapshot and zookeeper
>>> mechanism?
>>>
>>> Thank you
>>>
>>> On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> Could you expand a bit on what you mean? Are you referring to
>>>> *savepoints*?
>>>>
>>>> On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>>
>>>> Hi,
>>>>
>>>> Is there any library to use and remember the apache flink snapshot?
>>>>
>>>> Thank you
>>>>
>>>> --
>>>> Regards,
>>>> Abu Bakar Siddiqur Rahman
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Regards,
>>> Abu Bakar Siddiqur Rahman
>>> Graduate Research Student
>>> Natural Language Processing Laboratory
>>> Centro de Investigacion en Computacion
>>> Instituto Politecnico Nacional, Mexico City
>>>
>>>
>>>
>
> --
> Regards,
> Abu Bakar Siddiqur Rahman
>
>


Re: Question

2021-02-12 Thread Matthias Pohl
Checkpoints are stored in some DFS storage. The location can be specified
using state.checkpoints.dir configuration property [1]. You can access the
state of a savepoint or checkpoint using the State Processor API [2].

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Thank you for your reply.
>
> Another Question:
> After Checkpointing, we save our snapshot to a storage. How can we access
> the storage?
>
> is this the source code:
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>
> If it is not, could you please provide me the source code to access in the
> storage where snapshots are saved?
>
> Thank you
>
>
>
>
> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
> wrote:
>
>> Hi Abu Bakar Siddiqur Rahman,
>> Have you had a look at the Flink documentation [1]? It provides
>> step-by-step instructions on how to run a job (the Flink binaries provide
>> example jobs under ./examples) using a local standalone cluster. This
>> should also work on a Mac. You would just need to start the Flink cluster
>> (./bin/start-cluster.sh) and submit a job using one of the example jars
>> provided in the binaries (e.g. ./bin/flink run -d
>> ./examples/streaming/WordCount.jar). You can check the job running in
>> Flink's web UI being available under http://localhost:8081 if you use
>> the default configuration provided by the Flink binaries.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>>
>> On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <
>> bakar121...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Is there anyone who can inform me how I can connect a Java program to
>>> Apache Flink (in mac)?
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Abu Bakar Siddiqur Rahman
>>>
>>> On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <
>>> bakar121...@gmail.com> wrote:
>>>
>>>> Hi Chesnay,
>>>>
>>>> Could you please inform me that how can I connect a Java program to
>>>> apache Flink (in mac)?
>>>>
>>>> Thank you!
>>>>
>>>> Regards,
>>>> Abu Bakar Siddiqur Rahman
>>>>
>>>> On Wed, Feb 3, 2021, 3:06 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> Sure.
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper
>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper
>>>>>
>>>>> On 2/3/2021 3:08 AM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Is there any source code for the checkpoints, snapshot and zookeeper
>>>>> mechanism?
>>>>>
>>>>> Thank you
>>>>>
>>>>> On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> Could you expand a bit on what you mean? Are you referring to
>>>>>> *savepoints*?
>>>>>>
>>>>>> On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Is there any library to use and remember the apache flink snapshot?
>>>>>>
>>>>>> Thank you
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Abu Bakar Siddiqur Rahman
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Abu Bakar Siddiqur Rahman
>>>>> Graduate Research Student
>>>>> Natural Language Processing Laboratory
>>>>> Centro de Investigacion en Computacion
>>>>> Instituto Politecnico Nacional, Mexico City
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Regards,
>>> Abu Bakar Siddiqur Rahman
>>>
>>>
>
> --
> Regards,
> Abu Bakar Siddiqur Rahman
>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Flink’s Kubernetes HA services - NOT working

2021-02-15 Thread Matthias Pohl
I'm adding the Flink user ML to the conversation again.

On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl 
wrote:

> Hi Omer,
> thanks for sharing the configuration. You're right: Using NFS for HA's
> storageDir is fine.
>
> About the error message you're referring to: I haven't worked with the HA
> k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes
> documentation [1] points out that you can use a custom service account.
> This one needs special permissions to start/stop pods automatically (which
> does not apply in your case) but also to access ConfigMaps. You might want
> to try setting the permission as described in [1].
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>
> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery  wrote:
>
>> Hey Matthias.
>> My name is Omer, i am Daniel's devops, i will elaborate about our flink
>> situation.
>> these our flink resource definitions, as they are generated using the
>> helm template command (minus log4j,metrics configuration and some sensitive
>> data)
>> ---
>> # Source: flink/templates/flink-configmap.yaml
>> apiVersion: v1
>> kind: ConfigMap
>> metadata:
>>   name: flink-config
>>   labels:
>> app: flink
>> data:
>>   flink-conf.yaml: |
>> jobmanager.rpc.address: flink-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.execution.failover-strategy: region
>> jobmanager.memory.process.size: 8g
>> taskmanager.memory.process.size: 24g
>> taskmanager.memory.task.off-heap.size: 1g
>> taskmanager.numberOfTaskSlots: 4
>> queryable-state.proxy.ports: 6125
>> queryable-state.enable: true
>> blob.server.port: 6124
>> parallelism.default: 1
>> state.backend.incremental: true
>> state.backend: rocksdb
>> state.backend.rocksdb.localdir: /opt/flink/rocksdb
>> state.checkpoints.dir: file:///opt/flink/checkpoints
>> classloader.resolve-order: child-first
>> kubernetes.cluster-id: flink-cluster
>> kubernetes.namespace: intel360-beta
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> high-availability.storageDir: file:///opt/flink/recovery
>>
>> ---
>> # Source: flink/templates/flink-service.yaml
>> apiVersion: v1
>> kind: Service
>> metadata:
>>   name: flink-jobmanager
>>   labels:
>> {}
>> spec:
>>   ports:
>>   - name: http-ui
>> port: 8081
>> targetPort: http-ui
>>   - name: tcp-rpc
>> port: 6123
>> targetPort: tcp-rpc
>>   - name: tcp-blob
>> port: 6124
>> targetPort: tcp-blob
>>   selector:
>> app: flink
>> component: jobmanager
>> ---
>> # Source: flink/templates/flink-deployment.yaml
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>>   name: flink-jobmanager
>> spec:
>>   replicas: 1
>>   selector:
>> matchLabels:
>>   app: flink
>>   component: jobmanager
>>   template:
>> metadata:
>>   labels:
>> app: flink
>> component: jobmanager
>>   annotations:
>> checksum/config:
>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>> spec:
>>   containers:
>>   - name: jobmanager
>> image: flink:1.12.1-scala_2.11-java11
>> args: [ "jobmanager" ]
>> ports:
>> - name: http-ui
>>   containerPort: 8081
>> - name: tcp-rpc
>>   containerPort: 6123
>> - name: tcp-blob
>>   containerPort: 6124
>> resources:
>>   {}
>> # Environment Variables
>> env:
>> - name: ENABLE_CHECKPOINTING
>>   value: "true"
>> - name: JOB_MANAGER_RPC_ADDRESS
>>   value: "flink-jobmanager"
>> volumeMounts:
>> - name: flink-config
>>   mountPath: /opt/flink/conf/flink-conf.yaml
>>   subPath: flink-conf.yaml
>> # NFS mounts
>> - name: flink-checkpoints
>>   mountPath: "/opt/flink/checkpoints"
>> - name: flink-recovery
>>   mountPath: "/opt/flink/recovery"
>>   volumes:
>>   - name: flink-config
>> configMap:
>>  

Netty LocalTransportException: Sending the partition request to 'null' failed

2021-02-16 Thread Matthias Seiler
Hi Everyone,

I'm trying to setup a Flink cluster in standealone mode with two
machines. However, running a job throws the following exception:
`org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to 'null' failed`

Here is some background:

Machines:
- node-1: JobManager, TaskManager
- node-2: TaskManager

flink-conf-yaml looks like this:
```
jobmanager.rpc.address: node-1
taskmanager.numberOfTaskSlots: 8
parallelism.default: 2
cluster.evenly-spread-out-slots: true
```

Deploying the cluster works: I can see both TaskManagers in the WebUI.

I ran the streaming WordCount example: `flink run
flink-1.12.1/examples/streaming/WordCount.jar --input lorem-ipsum.txt`
- the job has been submitted
- job failed (with the above exception)
- the log of the node-2 also shows the exception, the other logs are
fine (graceful stop)

I played around with the config and observed that
- if parallelism is set to 1, node-1 gets all the slots and node-2 none
- if parallelism is set to 2, each TaskManager occupies 1 TaskSlot (but
fails because of node-2)

I suspect, that the problem must be with the communication between
TaskManagers
- job runs successful if
    - node-1 is the **only** node with x TaskManagers (tested with x=1
and x=2)
    - node-2 is the **only** node with x TaskManagers (tested with x=1
and x=2)
- job fails if
    - node-1 **and** node-2 have one TaskManager

The full exception is:
```
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 547d4d29b3650883aa403cdb5eb1ba5c)
// ... Job failed, Recovery is suppressed by
NoRestartBackoffTimeStrategy, ...
Caused by:
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to 'null' failed.
    at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
    at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
    at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.nio.channels.ClosedChannelException
    at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    ... 11 more
```

Thanks in advance,
Matthias



Re: Question

2021-02-22 Thread Matthias Pohl
Hi,
running your job from within your IDE with no specific configuration
provided (like the Flink job examples provided by the Flink [1]) means that
you spin up a local Flink cluster (see MiniCluster [2]). This does not have
the web UI enabled by default. You could enable it by calling
`StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);`,
instead. Don't forget to add the `flink-runtime-web` dependency:

  org.apache.flink
  flink-runtime-web_${scala.binary.version}
  ${flink.version}


Best,
Matthias

[1] https://github.com/apache/flink/tree/master/flink-examples
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html

On Sat, Feb 20, 2021 at 4:07 AM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi,
>
> I read it:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>
> I can run the code in the UI of Apache Flink that is in the bin file of
> Apache Flink. If I run a java code from intellij idea or eclipse, then how
> can I connect the code to apache flink UI?
>
> Thank you!
>
> On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl 
> wrote:
>
>> Checkpoints are stored in some DFS storage. The location can be specified
>> using state.checkpoints.dir configuration property [1]. You can access the
>> state of a savepoint or checkpoint using the State Processor API [2].
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
>> bakar121...@gmail.com> wrote:
>>
>>> Thank you for your reply.
>>>
>>> Another Question:
>>> After Checkpointing, we save our snapshot to a storage. How can we
>>> access the storage?
>>>
>>> is this the source code:
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>>>
>>> If it is not, could you please provide me the source code to access in
>>> the storage where snapshots are saved?
>>>
>>> Thank you
>>>
>>>
>>>
>>>
>>> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
>>> wrote:
>>>
>>>> Hi Abu Bakar Siddiqur Rahman,
>>>> Have you had a look at the Flink documentation [1]? It provides
>>>> step-by-step instructions on how to run a job (the Flink binaries provide
>>>> example jobs under ./examples) using a local standalone cluster. This
>>>> should also work on a Mac. You would just need to start the Flink cluster
>>>> (./bin/start-cluster.sh) and submit a job using one of the example jars
>>>> provided in the binaries (e.g. ./bin/flink run -d
>>>> ./examples/streaming/WordCount.jar). You can check the job running in
>>>> Flink's web UI being available under http://localhost:8081 if you use
>>>> the default configuration provided by the Flink binaries.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>>>>
>>>> On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <
>>>> bakar121...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is there anyone who can inform me how I can connect a Java program to
>>>>> Apache Flink (in mac)?
>>>>>
>>>>> Thank you!
>>>>>
>>>>> Regards,
>>>>> Abu Bakar Siddiqur Rahman
>>>>>
>>>>> On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <
>>>>> bakar121...@gmail.com> wrote:
>>>>>
>>>>>> Hi Chesnay,
>>>>>>
>>>>>> Could you please inform me that how can I connect a Java program to
>>>>>> apache Flink (in mac)?
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>> Regards,
>>>>>> Abu Bakar Siddiqur Rahman
>>>>>>
>>>>>> On Wed, Feb 3, 2021, 3:06 AM Chesnay Schepler 
>>>>>> wrote:
>>>>>>
>>>>>>> Sure.
>>>>>>>
>>>>>>>
>&

Re: Question

2021-02-22 Thread Matthias Pohl
Yes, Flink jobs are deployed using `./bin/flink run`. It will use the
configuration in conf/flink-conf.yaml to connect to the Flink cluster.

It looks like you don't have the right dependencies loaded onto your
classpath. Have you had a look at the documentation about project
configuration [1]? This gives you insight on how to set up the dependencies
for your Flink project. "Setting up a Project: Basic Dependencies" [2]
describes the basic requirements for the project dependencies. Maven
Quickstart [3] in contrast shows you how to initialize a Maven-based Flink
project.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#project-configuration
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#maven-quickstart

On Mon, Feb 22, 2021 at 5:06 PM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi Matthias Pohl,
>
> Thank you for your reply.
>
> At first, I'm sorry if my question make you confuse. Let me know if it's
> unclear to you.
>
> (1) To run a code in Flink, we will have to use this command:
> ./bin/flink run /home/username/folder/code.jar
>
> Is it correct?
>
> (2) I run a code in eclipse, it gives the error according to attached pic.
> I guess, if (1) is correct, then I still can't run in the Flink due to the
> error of the code.
>
> The code cannot be resolved org.apache or checkpointing mode
>
>
> Thank you
>
> On Mon, Feb 22, 2021, 7:48 AM Matthias Pohl 
> wrote:
>
>> Hi,
>> running your job from within your IDE with no specific configuration
>> provided (like the Flink job examples provided by the Flink [1]) means that
>> you spin up a local Flink cluster (see MiniCluster [2]). This does not have
>> the web UI enabled by default. You could enable it by calling
>> `StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);`,
>> instead. Don't forget to add the `flink-runtime-web` dependency:
>> 
>>   org.apache.flink
>>   flink-runtime-web_${scala.binary.version}
>>   ${flink.version}
>> 
>>
>> Best,
>> Matthias
>>
>> [1] https://github.com/apache/flink/tree/master/flink-examples
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
>>
>> On Sat, Feb 20, 2021 at 4:07 AM Abu Bakar Siddiqur Rahman Rocky <
>> bakar121...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I read it:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>>>
>>> I can run the code in the UI of Apache Flink that is in the bin file of
>>> Apache Flink. If I run a java code from intellij idea or eclipse, then how
>>> can I connect the code to apache flink UI?
>>>
>>> Thank you!
>>>
>>> On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl 
>>> wrote:
>>>
>>>> Checkpoints are stored in some DFS storage. The location can be
>>>> specified using state.checkpoints.dir configuration property [1]. You can
>>>> access the state of a savepoint or checkpoint using the State Processor API
>>>> [2].
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>>
>>>> On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
>>>> bakar121...@gmail.com> wrote:
>>>>
>>>>> Thank you for your reply.
>>>>>
>>>>> Another Question:
>>>>> After Checkpointing, we save our snapshot to a storage. How can we
>>>>> access the storage?
>>>>>
>>>>> is this the source code:
>>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>>>>>
>>>>> If it is not, could you please provide me the source code to access in
>>>>> the storage where snapshots are saved?
>>>>>
>>>>> Thank you
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
>>>>> wrot

Re: Flink application kept restarting

2021-02-26 Thread Matthias Pohl
Hi Rainie,
the network buffer pool was destroyed for some reason. This happens when
the NettyShuffleEnvironment gets closed which is triggered when an operator
is cleaned up, for instance. Maybe, the timeout in the metric system caused
this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
conversation hoping that he can give more insights.

If I may ask: What Flink version are you using?

Thanks,
Matthias


On Fri, Feb 26, 2021 at 8:39 AM Rainie Li  wrote:

> Hi All,
>
> Our flink application kept restarting and it did lots of RPC calls to a
> dependency service.
>
> *We saw this exception from failed task manager log: *
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
> at
> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
> at
> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(Pro

Re: Get JobId and JobManager RPC Address in RichMapFunction executed in TaskManager

2021-02-26 Thread Matthias Pohl
Hi Sandeep,
thanks for reaching out to the community. Unfortunately, the information
you're looking for is not exposed in a way that you could access it from
within your RichMapFunction. Could you elaborate a bit more on what you're
trying to achieve? Maybe, we can find another solution for your problem.

Best,
Matthias

On Thu, Feb 25, 2021 at 7:43 AM Sandeep khanzode 
wrote:

> Hello,
>
> I am deploying a standalone-job cluster (cluster with a single Job and
> Task Manager instance instantiated with a —job-classname and —job-id).
>
> I have map/flatmap/process functions being executed in the various stream
> functions in the Taskmanager for which I need access to the Job Id and the
> JobManager RPC address. How can I get access to these variables? What
> in-built environment/context/configuration functions exist for this purpose?
>
> I need these two variables for queryable-state.
>
> Thanks


Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-26 Thread Matthias Pohl
Hi Abhishek,
this might be caused by the switch from log4j to log4j2 as the default in
Flink 1.11 [1]. Have you had a chance to look at the logging documentation
[2] to enable log4j again?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.11.html#switch-to-log4j-2-by-default-flink-15672
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html

On Thu, Feb 25, 2021 at 5:56 AM Abhishek Shukla 
wrote:

> I was getting bean creation logs and spring boot start up logs in Flink
> 1.9 with flink1.9_log4j-cli.properties (attached)
>
> 
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> 
>
> log4j.rootLogger=INFO, file
>
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=true
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
> %-60c %x - %m%n
>
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN 
> cluster.log4j.logger.org.apache.flink.yarn=INFO, console
> log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
> log4j.logger.org.apache.hadoop=INFO, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
> %-5p %-60c %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded 
> (irrelevant for the client)
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
>
> but after updating to Flink 1.12.1 those logs are not getting printed in
> log file attaching flink1.12_log4j-cli.properties
>
> 
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> 
>
> rootLogger.level = INFO
> rootLogger.appenderRef.file.ref = FileAppender
>
> # Log all infos in the given fileappender.file.name = FileAppender
> appender.file.type = FILE
> appender.file.append = true
> appender.file.fileName = ${sys:log.file}
> appender.file.layout.type = PatternLayout
> appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
> %m%n
> logger.flink.name = org.apache.flink
> logger.flink.level = INFO
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.logger.yarn.name = 
> org.apache.flink.yarn
> logger.yarn.level = INFO
> logger.yarn.appenderRef.console.ref = ConsoleA

Re: Processing-time temporal join is not supported yet.

2021-02-26 Thread Matthias Pohl
Hi Eric,
it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
thread. Maybe, he has a workaround for your case.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19830

On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann 
wrote:

> Hello
> Working with flink 1.12.1 i read in the doc that Processing-time temporal
> join is supported for kv like join but when i try i get a:
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Processing-time temporal join is not supported yet.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>
> my query:
>
> SELECT e.id
> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
> AS r ON
> e.id = r.id
>
> my s3 table:
>
> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>
> my kafka table:
>
> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
> proctime AS PROCTIME())
>
>   WITH 
> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
> 127.0.0.1:9092','properties.group.id
> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
> 'properties.enable.auto.commit'='false')
>
>


Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-26 Thread Matthias Pohl
Hi Debraj,
thanks for reaching out to the Flink community. Without knowing the details
on how you've set up the Single-Node YARN cluster, I would still guess that
it is a configuration issue on the YARN side. Flink does not know about a
.flink folder. Hence, there is no configuration to set this folder.

Best,
Matthias

On Fri, Feb 26, 2021 at 2:40 PM Debraj Manna 
wrote:

> In my setup hadoop-yarn-nodemenager is running with yarn user.
>
> ubuntu@vrni-platform:/tmp/flink$ ps -ef | grep nodemanager
> yarn  4953 1  2 05:53 ?00:11:26
> /usr/lib/jvm/java-8-openjdk/bin/java -Dproc_nodemanager
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/lib/heap-dumps/yarn
> -XX:+ExitOnOutOfMemoryError -Dyarn.log.dir=/var/log/hadoop-yarn
> -Dyarn.log.file=hadoop-yarn-nodemanager-vrni-platform.log
> -Dyarn.home.dir=/usr/lib/hadoop-yarn -Dyarn.root.logger=INFO,console
> -Djava.library.path=/usr/lib/hadoop/lib/native -Xmx512m
> -Dhadoop.log.dir=/var/log/hadoop-yarn
> -Dhadoop.log.file=hadoop-yarn-nodemanager-vrni-platform.log
> -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=yarn
> -Dhadoop.root.logger=INFO,RFA -Dhadoop.policy.file=hadoop-policy.xml
> -Dhadoop.security.logger=INFO,NullAppender
> org.apache.hadoop.yarn.server.nodemanager.NodeManager
>
> I was executing the ./bin/flink command as ubuntu user and yarn user does
> not have permission to write to ubuntu's home folder in my setup.
>
> ubuntu@vrni-platform:/tmp/flink$ echo ~ubuntu
> /home/ubuntu
> ubuntu@vrni-platform:/tmp/flink$ echo ~yarn
> /var/lib/hadoop-yarn
>
>
> It appears to me flink needs permission to write to user's home directory
> to create a .flink folder even when the job is submitted in yarn. It is
> working fine for me if I run the flink with yarn user. in my setup.
>
> Just for my knowledge is there any config in flink to specify the location of
> .flink folder?
>
> On Thu, Feb 25, 2021 at 10:48 AM Debraj Manna 
> wrote:
>
>> The same has been asked in StackOverflow
>> <https://stackoverflow.com/questions/66355206/flink-1-12-1-example-application-failing-on-a-single-node-yarn-cluster>
>> also. Any suggestions here?
>>
>> On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
>> wrote:
>>
>>> I am trying out flink example as explained in flink docs
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>
>>>  in
>>> a single node yarn cluster
>>> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation>
>>> .
>>>
>>> On executing
>>>
>>> ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application
>>> -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
>>>
>>> It is failing with the below errors
>>>
>>> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
>>> deploy Yarn Application Cluster
>>> at 
>>> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
>>> at 
>>> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>> at 
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
>>> Caused by: 
>>> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The 
>>> YARN application unexpectedly switched to state FAILED during deployment.
>>> Diagnostics from YARN: Application application_1614159836384_0045 failed 1 
>>> times (global limit =2; local limit is =1) due to AM Container for 
>>> appattempt_1614159836384_0045_01 exited with  exitCode: -1000
>>> Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File 
>>> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>>>  does not exist
>&g

Re: Is it possible to specify max process memory in flink 1.8.2, similar to what is possible in flink 1.11

2021-02-26 Thread Matthias Pohl
Hi Bariša,
have you had the chance to analyze the memory usage in more detail? An
OutOfMemoryError might be an indication for some memory leak which should
be solved instead of lowering some memory configuration parameters. Or is
it that the off-heap memory is not actually used but blocks the JVM from
using the allocated memory for other things?

Best,
Matthias

On Fri, Feb 26, 2021 at 10:05 AM Timo Walther  wrote:

> Hi Barisa,
>
> by looking at the 1.8 documentation [1] it was possible to configure the
> off heap memory as well. Also other memory options were already present.
> So I don't think that you need an upgrade to 1.11 immediately. Please
> let us know if you could fix your problem, otherwise we can try to loop
> in other people that should know better.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-off-heap
>
>
>
> On 25.02.21 15:50, Bariša wrote:
> > Small update:
> >   we believe that the off heap memory is used by the parquet writer (
> > used in sink to write to S3 )
> >
> > On Wed, 24 Feb 2021 at 23:25, Bariša  > <mailto:barisa.obrado...@gmail.com>> wrote:
> >
> > I'm running flink 1.8.2 in a container, and under heavy load,
> > container gets OOM from the kernel.
> > I'm guessing that that reason for the kernel OOM is large size of
> > the off-heap memory. Is there a way I can limit it in flink 1.8.2?
> >
> > I can see that newer version of flink has a config param, checking
> > here is it possible to do something similar in flink 1.8.2, without
> > a flink upgrade?
> >
> > Cheers,
> > Barisa
> >
>


Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-26 Thread Matthias Pohl
Hi Debrai,
sorry for misleading you first. You're right. I looked through the code
once more and found something: There's the yarn.staging-directory [1] that
is set to the user's home folder by default. This parameter is used by the
YarnApplicationFileUploader [2] to upload the application files.

I hope that helps. Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#yarn-staging-directory
[2]
https://github.com/apache/flink/blob/7e76fc23f0d9f9b4424dcbd479ff95c049214cc6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L790

On Fri, Feb 26, 2021 at 5:51 PM Debraj Manna 
wrote:

> Thanks Matthias for replying.
>
> Yes there was some yarn configuration issue on my side which I mentioned
> in my last email.
>
> I am starting on flink. So just for my understanding in few links (posted
> below) it is reported that flink needs to create a .flink directory in the
> users home folder. Even though I am not using HDFS with yarn (in
> single-node deployment) but I am also observing the same. Is there a way I
> can configure the location where flink stores the jar and configuration
> file as mentioned in the below link?
>
>
> https://wints.github.io/flink-web//faq.html#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup
>
> From the above link
>
> *"Flink creates a .flink/ directory in the users home directory where it
> stores the Flink jar and configuration file."*
>
> Same mentioned here
> <https://docs.cloudera.com/csa/1.2.0/installation/topics/csa-hdfs-home-install.html>
> .
>
>
> <https://wints.github.io/flink-web//faq.html#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup>
>
> On Fri, Feb 26, 2021 at 9:45 PM Matthias Pohl 
> wrote:
>
>> Hi Debraj,
>> thanks for reaching out to the Flink community. Without knowing the
>> details on how you've set up the Single-Node YARN cluster, I would still
>> guess that it is a configuration issue on the YARN side. Flink does not
>> know about a .flink folder. Hence, there is no configuration to set this
>> folder.
>>
>> Best,
>> Matthias
>>
>> On Fri, Feb 26, 2021 at 2:40 PM Debraj Manna 
>> wrote:
>>
>>> In my setup hadoop-yarn-nodemenager is running with yarn user.
>>>
>>> ubuntu@vrni-platform:/tmp/flink$ ps -ef | grep nodemanager
>>> yarn  4953 1  2 05:53 ?00:11:26
>>> /usr/lib/jvm/java-8-openjdk/bin/java -Dproc_nodemanager
>>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/lib/heap-dumps/yarn
>>> -XX:+ExitOnOutOfMemoryError -Dyarn.log.dir=/var/log/hadoop-yarn
>>> -Dyarn.log.file=hadoop-yarn-nodemanager-vrni-platform.log
>>> -Dyarn.home.dir=/usr/lib/hadoop-yarn -Dyarn.root.logger=INFO,console
>>> -Djava.library.path=/usr/lib/hadoop/lib/native -Xmx512m
>>> -Dhadoop.log.dir=/var/log/hadoop-yarn
>>> -Dhadoop.log.file=hadoop-yarn-nodemanager-vrni-platform.log
>>> -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=yarn
>>> -Dhadoop.root.logger=INFO,RFA -Dhadoop.policy.file=hadoop-policy.xml
>>> -Dhadoop.security.logger=INFO,NullAppender
>>> org.apache.hadoop.yarn.server.nodemanager.NodeManager
>>>
>>> I was executing the ./bin/flink command as ubuntu user and yarn user
>>> does not have permission to write to ubuntu's home folder in my setup.
>>>
>>> ubuntu@vrni-platform:/tmp/flink$ echo ~ubuntu
>>> /home/ubuntu
>>> ubuntu@vrni-platform:/tmp/flink$ echo ~yarn
>>> /var/lib/hadoop-yarn
>>>
>>>
>>> It appears to me flink needs permission to write to user's home
>>> directory to create a .flink folder even when the job is submitted in
>>> yarn. It is working fine for me if I run the flink with yarn user. in
>>> my setup.
>>>
>>> Just for my knowledge is there any config in flink to specify the
>>> location of .flink folder?
>>>
>>> On Thu, Feb 25, 2021 at 10:48 AM Debraj Manna 
>>> wrote:
>>>
>>>> The same has been asked in StackOverflow
>>>> <https://stackoverflow.com/questions/66355206/flink-1-12-1-example-application-failing-on-a-single-node-yarn-cluster>
>>>> also. Any suggestions here?
>>>>
>>>> On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
>>>> wrote:
>>>>
>>>>> I am trying out flink example as explained in flink docs
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#a

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Matthias Pohl
Hi Abhishek,
have you also tried to apply the instructions listed in [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j1

On Mon, Mar 1, 2021 at 4:42 AM Abhishek Shukla 
wrote:

> Hi Matthias,
> Thanks for replying,
> I checked both of these pages,
> And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
> are there in property file,
>
> I am able to see the logs of pipeline once application in up, but the logs
> related to application failure or successful bean creation or logs at time
> of post construct are not getting printed out in file, which was happening
> in flink 1.9 with provided log4j-cli.properties file.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink application kept restarting

2021-03-01 Thread Matthias Pohl
Thanks for providing this information, Rainie. Are other issues documented
in the logs besides the TimeoutException in the JM logs which you already
shared? For now, it looks like that there was a connection problem between
the TaskManager and the JobManager that caused the shutdown of the operator
resulting in the NetworkBufferPool to be destroyed. For this scenario I
would expect other failures to occur besides the ones you shared.

Best,
Matthias

On Fri, Feb 26, 2021 at 8:28 PM Rainie Li  wrote:

> Thank you Mattias.
> It’s version1.9.
>
> Best regards
> Rainie
>
> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
> wrote:
>
>> Hi Rainie,
>> the network buffer pool was destroyed for some reason. This happens when
>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>> conversation hoping that he can give more insights.
>>
>> If I may ask: What Flink version are you using?
>>
>> Thanks,
>> Matthias
>>
>>
>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li  wrote:
>>
>>> Hi All,
>>>
>>> Our flink application kept restarting and it did lots of RPC calls to a
>>> dependency service.
>>>
>>> *We saw this exception from failed task manager log: *
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flin

Re: Flink application kept restarting

2021-03-01 Thread Matthias Pohl
Another question is: The timeout of 48 hours sounds strange. There should
have been some other system noticing the connection problem earlier
assuming that you have a reasonably low heartbeat interval configured.

Matthias

On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl  wrote:

> Thanks for providing this information, Rainie. Are other issues documented
> in the logs besides the TimeoutException in the JM logs which you already
> shared? For now, it looks like that there was a connection problem between
> the TaskManager and the JobManager that caused the shutdown of the operator
> resulting in the NetworkBufferPool to be destroyed. For this scenario I
> would expect other failures to occur besides the ones you shared.
>
> Best,
> Matthias
>
> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li  wrote:
>
>> Thank you Mattias.
>> It’s version1.9.
>>
>> Best regards
>> Rainie
>>
>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Rainie,
>>> the network buffer pool was destroyed for some reason. This happens when
>>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>>> conversation hoping that he can give more insights.
>>>
>>> If I may ask: What Flink version are you using?
>>>
>>> Thanks,
>>> Matthias
>>>
>>>
>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Our flink application kept restarting and it did lots of RPC calls to a
>>>> dependency service.
>>>>
>>>> *We saw this exception from failed task manager log: *
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>>> at
>>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>>> at
>>>> org.apache

Re: Flink application kept restarting

2021-03-03 Thread Matthias Pohl
Hi Rainie,
in general buffer pools being destroyed usually mean that some other
exception occurred that caused the task to fail and in the process of
failure handling the operator-related network buffer is destroyed. That
causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
case. It looks like you had some timeout problem while fetching data from a
Kafka topic.

Matthias

On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:

> Thanks for checking, Matthias.
>
> I have another flink job which failed last weekend with the same buffer
> pool destroyed error. This job is also running version 1.9.
> Here is the error I found from the task manager log. Any suggestion what
> is the root cause and how to fix it?
>
> 2021-02-28 00:54:45,943 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> while canceling task.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> --
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-03-10 Thread Matthias Pohl
Hi Abhishek,
sorry for the late reply. Did you manage to fix it? One remark: Are you
sure you're referring to the right configuration file? log4j-cli.properties
is used for the CLI tool [1]. Or do you try to get the logs from within the
main of your job?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j-2

On Thu, Mar 4, 2021 at 1:50 PM Abhishek Shukla 
wrote:

> @Matthis tried this but did not work, normal logs (application logs) are
> coming.
>
> but the startup bean creation or server error log in case of build failure
> are not getting printed in file
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Matthias Pohl
Hi Aeden,
just to be sure: All task managers have the same hardware/memory
configuration, haven't they? I'm not 100% sure whether this affects the
slot selection in the end, but it looks like this parameter has also an
influence on the slot matching strategy preferring slots with less
utilization of resources [1].

I'm gonna add Chesnay to the thread. He might have more insights here.
@Chesnay are there any other things that might affect the slot selection
when actually trying to evenly spread out the slots?

Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141

On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson 
wrote:

> Hi Arvid,
>
>   Thanks for responding. I did check the configuration tab of the job
> manager and the setting cluster.evenly-spread-out-slots: true is
> there. However I'm still observing unevenness in the distribution of
> source tasks. Perhaps this additional information could shed light.
>
> Version: 1.12.1
> Deployment Mode: Application
> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> Flink operator https://github.com/lyft/flinkk8soperator
>
> I did place the setting under the flinkConfig section,
>
> apiVersion: flink.k8s.io/v1beta1
> 
> spec:
>   flinkConfig:
> cluster.evenly-spread-out-slots: true
> high-availability: zookeeper
> ...
> state.backend: filesystem
> ...
>   jobManagerConfig:
> envConfig:
> 
>
> Would you explain how the setting ends up evenly distributing active
> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> TM3 ... TM18 in order and starting again. In my case I have 36
> partitions and 18 nodes so after the second pass in assignment I would
> end up with 2 subtasks in the consumer group on each TM. And then
> subsequent passes result in inactive consumers.
>
>
> Thank you,
> Aeden
>
> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:
> >
> > Hi Aeden,
> >
> > the option that you mentioned should have actually caused your desired
> behavior. Can you double-check that it's set for the job (you can look at
> the config in the Flink UI to be 100% sure).
> >
> > Another option is to simply give all task managers 2 slots. In that way,
> the scheduler can only evenly distribute.
> >
> > On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson 
> wrote:
> >>
> >> I have a cluster with 18 task managers 4 task slots each running a
> >> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> >> connector. The topic being read has 36 partitions. The problem I'm
> >> observing is that the subtasks for the sources are not evenly
> >> distributed. For example, 1 task manager will have 4 active source
> >> subtasks and other TM's none. Is there a way to force  each task
> >> manager to have 2 active source subtasks.  I tried using the setting
> >> cluster.evenly-spread-out-slots: true , but that didn't have the
> >> desired effect.
> >>
> >> --
> >> Thank you,
> >> Aeden


Re: Netty LocalTransportException: Sending the partition request to 'null' failed

2021-03-15 Thread Matthias Seiler
Hi Arvid,

I listened to ports with netcat and connected via telnet and each node
can connect to the other and itself.

The `/etc/hosts` file looks like this
```
127.0.0.1   localhost
127.0.1.1   node-2.example.com   node-2

   node-1
```
Is the second line the reason it fails? I also replaced all hostnames
with IP addresses in the config files (flink-conf, workers, masters) but
without effect...

Do you have any ideas what else I could try?

Thanks again,
Matthias

On 2/24/21 2:17 PM, Arvid Heise wrote:
> Hi Matthias,
>
> most of the debug statements are just noise. You can ignore that.
>
> Something with your network seems fishy to me. Either taskmanager 1
> cannot connect to taskmanager 2 (and vice versa), or the taskmanager
> cannot connect locally.
>
> I found this fragment, which seems suspicious
>
> Failed to connect to /127.0.*1*.1:32797. Giving up.
>
> localhost is usually 127.0.0.1. Can you double check that you connect
> from all machines to all machines (including themselves) by opening
> trivial text sockets on random ports?
>
> On Fri, Feb 19, 2021 at 10:59 AM Matthias Seiler
>  <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>
> Hi Till,
>
> thanks for the hint, you seem about right. Setting the log level
> to DEBUG reveals more information, but I don't know what to do
> about it.
>
> All logs throw some Java related exceptions:
> `java.lang.UnsupportedOperationException: Reflective
> setAccessible(true) disabled`
> and
> `java.lang.IllegalAccessException: class
> org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0$6
> cannot access class jdk.internal.misc.Unsafe (in module java.base)
> because module java.base does not export jdk.internal.misc to
> unnamed module`
>
> The log of node-2's TaskManager reveals connection problems:
> `org.apache.flink.runtime.net.ConnectionUtils [] -
> Failed to connect from address 'node-2/127.0.1.1
> <http://127.0.1.1>': Invalid argument (connect failed)`
> `java.net.ConnectException: Invalid argument (connect failed)`
>
> What's more, both TaskManagers (node-1 and node-2) are having
> trouble to load
> `org_apache_flink_shaded_netty4_netty_transport_native_epoll_x86_64`,
> but load some version eventually.
>
>
> There is quite a lot going on here that I don't understand. Can
> you (or someone) shed some light on it and tell me what I could try?
>
> Some more information:
> I appended the following to the `/etc/hosts` file:
> ```
>  node-1
>  node-2
> ```
> And the `flink/conf/workers` consists of:
> ```
> node-1
> node-2
> ```
>
> Thanks,
> Matthias
>
> P.S. I attached the logs for further reference. `` is
> of course the real IP address instead.
>
>
> On 2/16/21 1:56 PM, Till Rohrmann wrote:
>> Hi Matthias,
>>
>> Can you make sure that node-1 and node-2 can talk to each other?
>> It looks to me that node-2 fails to open a connection to the
>> other TaskManager. Maybe the logs give some more insights. You
>> can change the log level to DEBUG to gather more information.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 10:57 AM Matthias Seiler
>> > <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>>
>> Hi Everyone,
>>
>> I'm trying to setup a Flink cluster in standealone mode with two
>> machines. However, running a job throws the following exception:
>> `org.apache.flink.runtime.io
>> 
>> <http://org.apache.flink.runtime.io>.network.netty.exception.LocalTransportException:
>> Sending the partition request to 'null' failed`
>>
>> Here is some background:
>>
>> Machines:
>> - node-1: JobManager, TaskManager
>> - node-2: TaskManager
>>
>> flink-conf-yaml looks like this:
>> ```
>> jobmanager.rpc.address: node-1
>> taskmanager.numberOfTaskSlots: 8
>> parallelism.default: 2
>> cluster.evenly-spread-out-slots: true
>> ```
>>
>> Deploying the cluster works: I can see both TaskManagers in
>> the WebUI.
>>
>> I ran the streaming WordCount example: `flink run
>> flink-1.12.1/examples/streaming/WordCount.jar --input
>> lorem-ipsum.txt`
>> - the job has been submitted
>> - job failed (

Re: Flink History server ( running jobs )

2021-03-19 Thread Matthias Pohl
Hi Vishal,
yes, as the documentation explains [1]: Only jobs that reached a globally
terminal state are archived into Flink's history server. State information
about running jobs can be retrieved through Flink's REST API.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/historyserver.html#overview

On Wed, Mar 17, 2021 at 10:33 PM Vishal Santoshi 
wrote:

> Hello folks,
>
> Does fliink server not provide for running jobs ( like spark history does
> ) ?
>
> Regards.
>


Re: Netty LocalTransportException: Sending the partition request to 'null' failed

2021-03-22 Thread Matthias Seiler
Thanks a bunch! I replaced 127.0.1.1 with the actual IP address and it
works now :)

On 3/15/21 3:22 PM, Robert Metzger wrote:
> Hey Matthias,
>
> are you sure you can connect to 127.0.1.1, since everything between
> 127.0.0.1 and  127.255.255.255 is bound to the loopback device?:
> https://serverfault.com/a/363098 <https://serverfault.com/a/363098>
>
>
>
> On Mon, Mar 15, 2021 at 11:13 AM Matthias Seiler
>  <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>
> Hi Arvid,
>
> I listened to ports with netcat and connected via telnet and each
> node can connect to the other and itself.
>
> The `/etc/hosts` file looks like this
> ```
> 127.0.0.1   localhost
> 127.0.1.1   node-2.example.com <http://node-2.example.com>   node-2
>
>    node-1
> ```
> Is the second line the reason it fails? I also replaced all
> hostnames with IP addresses in the config files (flink-conf,
> workers, masters) but without effect...
>
> Do you have any ideas what else I could try?
>
> Thanks again,
> Matthias
>
> On 2/24/21 2:17 PM, Arvid Heise wrote:
>> Hi Matthias,
>>
>> most of the debug statements are just noise. You can ignore that.
>>
>> Something with your network seems fishy to me. Either taskmanager
>> 1 cannot connect to taskmanager 2 (and vice versa), or the
>> taskmanager cannot connect locally.
>>
>> I found this fragment, which seems suspicious
>>
>> Failed to connect to /127.0.*1*.1:32797. Giving up.
>>
>> localhost is usually 127.0.0.1. Can you double check that you
>> connect from all machines to all machines (including themselves)
>> by opening trivial text sockets on random ports?
>>
>> On Fri, Feb 19, 2021 at 10:59 AM Matthias Seiler
>> > <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>>
>> Hi Till,
>>
>> thanks for the hint, you seem about right. Setting the log
>> level to DEBUG reveals more information, but I don't know
>> what to do about it.
>>
>> All logs throw some Java related exceptions:
>> `java.lang.UnsupportedOperationException: Reflective
>> setAccessible(true) disabled`
>> and
>> `java.lang.IllegalAccessException: class
>> 
>> org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0$6
>> cannot access class jdk.internal.misc.Unsafe (in module
>> java.base) because module java.base does not export
>> jdk.internal.misc to unnamed module`
>>
>> The log of node-2's TaskManager reveals connection problems:
>> `org.apache.flink.runtime.net.ConnectionUtils
>> [] - Failed to connect from address 'node-2/127.0.1.1
>> <http://127.0.1.1>': Invalid argument (connect failed)`
>> `java.net.ConnectException: Invalid argument (connect failed)`
>>
>> What's more, both TaskManagers (node-1 and node-2) are having
>> trouble to load
>> `org_apache_flink_shaded_netty4_netty_transport_native_epoll_x86_64`,
>> but load some version eventually.
>>
>>
>> There is quite a lot going on here that I don't understand.
>> Can you (or someone) shed some light on it and tell me what I
>> could try?
>>
>> Some more information:
>> I appended the following to the `/etc/hosts` file:
>> ```
>>  node-1
>>  node-2
>> ```
>> And the `flink/conf/workers` consists of:
>> ```
>> node-1
>> node-2
>> ```
>>
>> Thanks,
>> Matthias
>>
>> P.S. I attached the logs for further reference. ``
>> is of course the real IP address instead.
>>
>>
>> On 2/16/21 1:56 PM, Till Rohrmann wrote:
>>> Hi Matthias,
>>>
>>> Can you make sure that node-1 and node-2 can talk to each
>>> other? It looks to me that node-2 fails to open a connection
>>> to the other TaskManager. Maybe the logs give some more
>>> insights. You can change the log level to DEBUG to gather
>>> more information.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 16, 2021 at 10:57 AM Matthias Seiler
>>> >>

Re: Read the metadata files (got from savepoints)

2021-03-22 Thread Matthias Pohl
Hi Abdullah,
you might also want to have a look at the State Processor API [1].

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Mon, Mar 22, 2021 at 6:28 AM Congxian Qiu  wrote:

> Hi
>Maybe you can reach to this test[1] for reference
>
> [1]
> https://github.com/apache/flink/blob/a33e6bd390a9935c3e25b6913bed0ff6b4a78818/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java#L55
> Best,
> Congxian
>
>
> Abdullah bin Omar  于2021年3月22日周一 上午11:25写道:
>
>> Hi,
>>
>> (My work is to see the state. So, I have got the save points
>> metadata file where the state is saved)
>>
>> *What is the way to read the metadata files that are saved from
>> savepoints in the local machine?* I guess the file is in binary.
>>
>>
>> Thank you
>>
>> Regards,
>> Abdullah
>>
>>


Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread Matthias Pohl
Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks
like the OutputStream is added to a CloseableRegistry before writing to it
[1].

My suspicion is - based on the exception cause - that the CloseableRegistry
got triggered while restoring the state. I tried to track down the source
of the CloseableRegistry. It looks like it's handed down from the
StreamTask [2].

The StreamTask closes the CloseableRegistry either when cancelling is
triggered or in the class' finalize method. Have you checked the logs to
see whether there was some task cancellation logged?

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269

On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
wrote:

> Hello Hivemind,
>
> We are running a stateful streaming job. Each task manager instance hosts
> around ~100GB of data. During restart of task managers we encountered
> following errors, because of which the job is not able to restart.
> Initially we thought it might be due to failing status checks of attached
> EBS volumes or burst balance exhaustion but AWS console is not indicating
> any issue with EBS volumes. Is there anything that else that we need to
> look at which can potentially cause this exception? Also it's quite unclear
> what exactly is the cause of the exception, any help on that would be much
> appreciated.
>
> Flink version: 1.12.2_scala_2.11
> Environment: Kubernetes on AWS
> Volume Type: EBS, gp2 300GiB
>
> *ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
> - Caught unexpected exception.*
> *java.nio.channels.ClosedChannelException: null*
> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> ~[?:?]*
> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]*
> * at java.lang.Thread.run(Thread.java:830) [?:?]*
> *2021-03-19 15:26:10,385 WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
> alternative (1/1), will retry while more alternatives are available.*
> *org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
> * at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>

Re: Evenly distribute task slots across task-manager

2021-03-23 Thread Matthias Pohl
Hi Vignesh,
are you trying to achieve an even distribution of tasks for this one
operator that has the parallelism set to 16? Or do you observe the
described behavior also on a job level?
I'm adding Chesnay to the thread as he might have more insights on this
topic.

Best,
Matthias

On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh 
wrote:

> Hello Everyone,
>
> Can someone help me with a solution?
>
> I have a flink job(2 task-managers) with a job parallelism of 64 and task
> slot of 64.
> I have a parallelism set for one of the operators as 16. This operator(16
> parallelism) slots are not getting evenly distributed across two task
> managers. It often takes higher task slots like 10/11 in one task manager
> and 5/6 in other task manager.
>
> I'am using flink version 1.11.2. I tried adding 
> cluster.evenly-spread-out-slots:
> true but it didn't work. Any solution is greatly appreciated.
>
> Thanks in advance,
>
> Regards,
> Vignesh
>
>


Re: Evenly distribute task slots across task-manager

2021-03-23 Thread Matthias Pohl
There was a similar discussion recently in this mailing list about
distributing the work onto different TaskManagers. One finding Xintong
shared there [1] was that the parameter cluster.evenly-spread-out-slots is
used to evenly allocate slots among TaskManagers but not how the tasks are
actually distributed among the allocated slots. It would be interesting to
know more about your job. If the upstream operator does some shuffling, you
might run into the issue of the task executions not being distributed
evenly anymore.

Matthias

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evenly-Spreading-Out-Source-Tasks-tp42108p42235.html

On Tue, Mar 23, 2021 at 1:42 PM Matthias Pohl 
wrote:

> Hi Vignesh,
> are you trying to achieve an even distribution of tasks for this one
> operator that has the parallelism set to 16? Or do you observe the
> described behavior also on a job level?
> I'm adding Chesnay to the thread as he might have more insights on this
> topic.
>
> Best,
> Matthias
>
> On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh 
> wrote:
>
>> Hello Everyone,
>>
>> Can someone help me with a solution?
>>
>> I have a flink job(2 task-managers) with a job parallelism of 64 and task
>> slot of 64.
>> I have a parallelism set for one of the operators as 16. This operator(16
>> parallelism) slots are not getting evenly distributed across two task
>> managers. It often takes higher task slots like 10/11 in one task manager
>> and 5/6 in other task manager.
>>
>> I'am using flink version 1.11.2. I tried adding 
>> cluster.evenly-spread-out-slots:
>> true but it didn't work. Any solution is greatly appreciated.
>>
>> Thanks in advance,
>>
>> Regards,
>> Vignesh
>>
>>


Re: QueryableStateClient getKVState

2021-03-23 Thread Matthias Pohl
Hi Sandeep,
the equals method does not compare the this.map with that.map but
that.dimensions. ...at least in your commented out code. Might this be the
problem?

Best,
Matthias

On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode 
wrote:

> Hi,
>
> I have a stream that exposes the state for Queryable State.
>
> I am using the key as follows:
>
> public class MyKey {
> private Long first;
> private EnumType myType;
> private Long second;
>
> private TreeMap map;
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyKey that = (MyKey) o;
> boolean isEqual = first.longValue() == that.first.longValue() &&
> myTime.name().equalsIgnoreCase(that.myTime.name()) &&
> second.longValue() == that.second.longValue();// &&
> //map.equals(that.dimensions);
> return isEqual;
> }
>
> @Override
> public int hashCode() {
> int result = first != null ? first.hashCode() : 0;
> result = 31 * result + (myType != null ? myType.name().hashCode() : 
> 0);
> result = 31 * result + (second != null ? second.hashCode() : 0);
> //result = 31 * result + (map != null ? map.hashCode() : 0);
> return result;
> }
>
> }
>
>
> If I only set the first three members for the key class, then the key
> lookup works correctly.
>
> If I add the TreeMap, then the lookup always errors with the message; “No
> state found for the given key/namespace”.
>
> What am I dong wrong with the TreeMap as a member in the Key class for
> equals/hashcode?
>
> Thanks,
> Sandeep
>


Re: Flink Streaming Counter

2021-03-23 Thread Matthias Pohl
Hi Vijayendra,
thanks for reaching out to the Flink community. What do you mean by
displaying it in your local IDE? Would it be ok to log the information out
onto stdout? You might want to have a look at the docs about setting up a
slf4j metrics report [1] if that's the case.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> Could you provide a sample how to pass Flink Datastream Source and sink
> results to increment COUNTER and then I want to display the Counter in
> Local IDE.
> Counter to display for #1 through #3.
>
> 1) DataStream messageStream = env.addSource(Kinesis Source);
> 2) DataStream outputStream =
> messageStream.rebalance().map(CustomMapFunction());
> 3) outputStream.addSink(Streaming File Sink).
>
> public class MyMapper extends RichMapFunction {
>   private transient Counter counter;
>
>   @Override
>   public void open(Configuration config) {
> this.counter = getRuntimeContext()
>   .getMetricGroup()
>   .counter("myCounter");
>   }
>
>   @Override
>   public String map(String value) throws Exception {
> this.counter.inc();
> return value;
>   }}
>
>
> Thanks,
> Vijay
>


Re: QueryableStateClient getKVState

2021-03-23 Thread Matthias Pohl
Could you provide the full stacktrace of your error? That might help me to
dig into the code.

Matthias

On Tue, Mar 23, 2021 at 2:33 PM Sandeep khanzode 
wrote:

> Hi Matthias,
>
> Thanks. But yes, I am comparing map with that.map … the comment is
> probably for the previous variable name.
>
> I can use String, Int, Enum, Long type keys in the Key that I send in the
> Query getKvState … but the moment I introduce a TreeMap, even though it
> contains a simple one entry String, String, it doesn’t work …
>
> Thanks,
> Sandeep
>
> On 23-Mar-2021, at 7:00 PM, Matthias Pohl  wrote:
>
> Hi Sandeep,
> the equals method does not compare the this.map with that.map but
> that.dimensions. ...at least in your commented out code. Might this be the
> problem?
>
> Best,
> Matthias
>
> On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode 
> wrote:
>
>> Hi,
>>
>> I have a stream that exposes the state for Queryable State.
>>
>> I am using the key as follows:
>>
>> public class MyKey {
>> private Long first;
>> private EnumType myType;
>> private Long second;
>>
>> private TreeMap map;
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> MyKey that = (MyKey) o;
>> boolean isEqual = first.longValue() == that.first.longValue() &&
>> myTime.name().equalsIgnoreCase(that.myTime.name()) &&
>> second.longValue() == that.second.longValue();// &&
>> //map.equals(that.dimensions);
>> return isEqual;
>> }
>>
>> @Override
>> public int hashCode() {
>> int result = first != null ? first.hashCode() : 0;
>> result = 31 * result + (myType != null ? myType.name().hashCode() : 
>> 0);
>> result = 31 * result + (second != null ? second.hashCode() : 0);
>> //result = 31 * result + (map != null ? map.hashCode() : 0);
>> return result;
>> }
>>
>> }
>>
>>
>> If I only set the first three members for the key class, then the key
>> lookup works correctly.
>>
>> If I add the TreeMap, then the lookup always errors with the message; “No
>> state found for the given key/namespace”.
>>
>> What am I dong wrong with the TreeMap as a member in the Key class for
>> equals/hashcode?
>>
>> Thanks,
>> Sandeep
>>
>


Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-23 Thread Matthias Pohl
Hi Aeden,
sorry for the late reply. I looked through the code and verified that the
JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0 will
disable the automatic watermark generation. I created FLINK-21931 [1] to
cover this.

Thanks,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-21931

On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson 
wrote:

> Correction: The first link was supposed to be,
>
> 1. pipeline.auto-watermark-interval
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
>
> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson 
> wrote:
> >
> > I'm hoping to have my confusion clarified regarding the settings,
> >
> > 1. pipeline.auto-watermark-interval
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
> >
> > 2. setAutoWatermarkInterval
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
> >
> > I noticed the default value of pipeline.auto-watermark-interval is 0
> > and according to these docs,
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
> ,
> > it states, "If watermark interval is 0ms, the generated watermarks
> > will be emitted per-record if it is not null and greater than the last
> > emitted one." However in the documentation for
> > setAutoWatermarkInterval the value 0 disables watermark emission.
> >
> > * Are they intended to be the same setting? If not how are they
> > different? Is one for FlinkSql and the other DataStream API?
> >
> > --
> > Thank you,
> > Aeden


Re: How to get operator uid from a sql

2021-03-23 Thread Matthias Pohl
Hi XU Qinghui,
sorry for the late reply. Unfortunately, the operator ID does not mean to
be accessible for Flink SQL through the API. You might have a chance to
extract the Operator ID through the debug
logs. StreamGraphHasherV2.generateDeterministicHash logs out the operator
ID [1]:
"[main] DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] -
Generated hash 'cbc357ccb763df2852fee8c4fc7d55f2' for node [...]"
The hash refers to the operator ID.

Best,
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L269

On Tue, Mar 2, 2021 at 11:56 AM XU Qinghui  wrote:

> Hello folks
>
> I'm trying to use the flink state processor api to read the state of
> operators from a checkpoint. But currently the operator look up in the API
> relies on the operator `uid` (e.g. ExistingSavepoint.readKeyedState(uid,
> readerFunction)).
> But when it comes to a sql job, where should I look up for the operator
> uid? I tried both the hexstring of the hash and the operator name, but
> neither works.
>
> Could you point me in the right direction?
>
> BR,
>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-23 Thread Matthias Pohl
Hi Vishal,
I'm not 100% sure what you're trying to do. But the partitioning by a key
just relies on the key on the used parallelism. So, I guess, what you
propose should work.
You would have to rely on some join function, though, when merging two
input operators into one again.

I hope that was helpful.
Best,
Matthias

On Tue, Mar 23, 2021 at 3:29 PM vishalovercome  wrote:

> Suppose i have a job with 3 operators with the following job graph:
>
> O1 => O2 // data stream partitioned by keyBy
> O1 => O3 // data stream partitioned by keyBy
> O2 => O3 // data stream partitioned by keyBy
>
> If operator O3 receives inputs from two operators and both inputs have the
> same type and value for a key then will the two streams end up in the same
> sub-task and therefore affect the same state variables keyed to that
> particular key? Do the streams themselves have to have the same type or is
> it enough that just the keys of each of the input streams have the same
> type
> and value?
>
> If they're not guaranteed to affect the same state then how can we achieve
> the same? I would prefer to use the simple
> RichMapFunction/RichFlatmapFunction for modelling my operators as opposed
> to
> any join function.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Streaming Counter

2021-03-24 Thread Matthias Pohl
Hi Vijayendra,
what about the example from the docs you already referred to [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#counter

On Tue, Mar 23, 2021 at 6:48 PM Vijayendra Yadav 
wrote:

> Hi Pohl,
>
> Thanks for getting back to me so quickly. I am looking for a sample
> example where I can increment counters on each stage #1 thru #3 for
> DATASTREAM.
> Then probably I can print it using slf4j.
>
> Thanks,
> Vijay
>
> On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl 
> wrote:
>
>> Hi Vijayendra,
>> thanks for reaching out to the Flink community. What do you mean by
>> displaying it in your local IDE? Would it be ok to log the information out
>> onto stdout? You might want to have a look at the docs about setting up a
>> slf4j metrics report [1] if that's the case.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Could you provide a sample how to pass Flink Datastream Source and sink
>>> results to increment COUNTER and then I want to display the Counter in
>>> Local IDE.
>>> Counter to display for #1 through #3.
>>>
>>> 1) DataStream messageStream = env.addSource(Kinesis Source);
>>> 2) DataStream outputStream =
>>> messageStream.rebalance().map(CustomMapFunction());
>>> 3) outputStream.addSink(Streaming File Sink).
>>>
>>> public class MyMapper extends RichMapFunction {
>>>   private transient Counter counter;
>>>
>>>   @Override
>>>   public void open(Configuration config) {
>>> this.counter = getRuntimeContext()
>>>   .getMetricGroup()
>>>   .counter("myCounter");
>>>   }
>>>
>>>   @Override
>>>   public String map(String value) throws Exception {
>>> this.counter.inc();
>>> return value;
>>>   }}
>>>
>>>
>>> Thanks,
>>> Vijay
>>>
>>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread Matthias Pohl
1. yes - the same key would affect the same state variable
2. you need a join to have the same operator process both streams

Matthias

On Wed, Mar 24, 2021 at 7:29 AM vishalovercome  wrote:

> Let me make the example more concrete. Say O1 gets as input a data stream
> T1
> which it splits into two using some function and produces DataStreams of
> type T2 and T3, each of which are partitioned by the same key function TK.
> Now after O2 processes a stream, it could sometimes send the stream to O3
> (T4) using the same key function again. Now I want to know whether:
>
> 1. Data from streams T3 with key K and T4 with key K end up affecting the
> state variables for the same key K or different. I would think that would
> be
> the case but wanted a confirmation
> 2. An explicit join is needed or not, i.e. whether this will achieve what I
> want:
>
> result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
> does)
> result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataDog and Flink

2021-03-24 Thread Matthias Pohl
Hi Vishal,
what about the TM metrics' REST endpoint [1]. Is this something you could
use to get all the metrics for a specific TaskManager? Or are you looking
for something else?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#taskmanagers-metrics

On Tue, Mar 23, 2021 at 10:59 PM Vishal Santoshi 
wrote:

> That said, is there a way to get a dump of all metrics exposed by TM. I
> was searching for it and I bet we could get it for ServieMonitor on k8s (
> scrape ) but am missing a way to het a TM and dump all metrics that are
> pushed.
>
> Thanks and regards.
>
> On Tue, Mar 23, 2021 at 5:56 PM Vishal Santoshi 
> wrote:
>
>> I guess there is a bigger issue here. We dropped the property to 500. We
>> also realized that this failure happened on a TM that had one specific job
>> running on it. What was good ( but surprising ) that the exception was the
>> more protocol specific 413  ( as in the chunk is greater then some size
>> limit DD has on a request.
>>
>> Failed to send request to Datadog (response was Response{protocol=h2,
>> code=413, message=, url=
>> https://app.datadoghq.com/api/v1/series?api_key=**}
>> <https://app.datadoghq.com/api/v1/series?api_key=0ffa36e48f5042465635b5843fa3f2a6%7D>
>> )
>>
>> which implies that the Socket timeout was masking this issue. The 2000
>> was just a huge payload that DD was unable to parse in time ( or was slow
>> to upload etc ). Now we could go lower but that makes less sense. We could
>> play with
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#system-scope
>> to reduce the size of the tags ( or keys ).
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 23, 2021 at 11:33 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> If we look at this
>>> <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java#L159>
>>> code , the metrics are divided into chunks up-to a max size. and
>>> enqueued
>>> <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java#L110>.
>>> The Request
>>> <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java#L75>
>>> has a 3 second read/connect/write timeout which IMHO should have been
>>> configurable ( or is it ) . While the number metrics ( all metrics )
>>> exposed by flink cluster is pretty high ( and the names of the metrics
>>> along with tags ) , it may make sense to limit the number of metrics in a
>>> single chunk ( to ultimately limit the size of a single chunk ). There is
>>> this configuration which allows for reducing the metrics in a single chunk
>>>
>>> metrics.reporter.dghttp.maxMetricsPerRequest: 2000
>>>
>>> We could decrease this to 1500 ( 1500 is pretty, not based on any
>>> empirical reasoning ) and see if that stabilizes the dispatch. It is
>>> inevitable that the number of requests will grow and we may hit the
>>> throttle but then we know the exception rather than the timeouts that are
>>> generally less intuitive.
>>>
>>> Any thoughts?
>>>
>>>
>>>
>>> On Mon, Mar 22, 2021 at 10:37 AM Arvid Heise  wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> I have no experience in the Flink+DataDog setup but worked a bit with
>>>> DataDog before.
>>>> I'd agree that the timeout does not seem like a rate limit. It would
>>>> also be odd that the other TMs with a similar rate still pass. So I'd
>>>> suspect n/w issues.
>>>> Can you log into the TM's machine and try out manually how the system
>>>> behaves?
>>>>
>>>> On Sat, Mar 20, 2021 at 1:44 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Hello folks,
>>>>>   This is quite strange. We see a TM stop reporting
>>>>> metrics to DataDog .The logs from that specific TM  for every DataDog
>>>>> dispatch time out with* java.net.SocketTimeoutException: timeout *and
>>>>> that seems to repeat over every dispatch to DataDog. It seems it is on a 
>>>

Hadoop is not in the classpath/dependencies

2021-03-25 Thread Matthias Seiler
Hello everybody,

I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
The job should store the checkpoints on HDFS like so:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
```

Unfortunately, the JobManager throws
```
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems,
please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
// ...
Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
not in the classpath/dependencies.
```
and I don't understand why.

`echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
wildcards. Flink's JobManger prints the classpath which includes
specific packages from these Hadoop libraries. Besides that, Flink
creates the state directories on HDFS, but no content.

Thank you for any advice,
Matthias



Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-25 Thread Matthias Pohl
Hi everyone,
considering the upcoming release of Flink 1.13, I wanted to revive the
discussion about the Mesos support ones more. Mesos is also already listed
as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align the
documentation accordingly to make it more explicit?

What do you think?

Best,
Matthias

[1] https://flink.apache.org/roadmap.html#feature-radar

On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann  wrote:

> Hi Oleksandr,
>
> yes you are right. The biggest problem is at the moment the lack of test
> coverage and thereby confidence to make changes. We have some e2e tests
> which you can find here [1]. These tests are, however, quite coarse grained
> and are missing a lot of cases. One idea would be to add a Mesos e2e test
> based on Flink's end-to-end test framework [2]. I think what needs to be
> done there is to add a Mesos resource and a way to submit jobs to a Mesos
> cluster to write e2e tests.
>
> [1] https://github.com/apache/flink/tree/master/flink-jepsen
> [2]
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>
> Cheers,
> Till
>
> On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
> o.nitavs...@criteo.com> wrote:
>
>> Hello Xintong,
>>
>> Thanks for the insights and support.
>>
>> Browsing the Mesos backlog and didn't identify anything critical, which
>> is left there.
>>
>> I see that there are were quite a lot of contributions to the Flink Mesos
>> in the recent version:
>> https://github.com/apache/flink/commits/master/flink-mesos.
>> We plan to validate the current Flink master (or release 1.12 branch) our
>> Mesos setup. In case of any issues, we will try to propose changes.
>> My feeling is that our test results shouldn't affect the Flink 1.12
>> release cycle. And if any potential commits will land into the 1.12.1 it
>> should be totally fine.
>>
>> In the future, we would be glad to help you guys with any
>> maintenance-related questions. One of the highest priorities around this
>> component seems to be the development of the full e2e test.
>>
>> Kind Regards
>> Oleksandr Nitavskyi
>> 
>> From: Xintong Song 
>> Sent: Tuesday, October 27, 2020 7:14 AM
>> To: dev ; user 
>> Cc: Piyush Narang 
>> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>>
>> Hi Piyush,
>>
>> Thanks a lot for sharing the information. It would be a great relief that
>> you are good with Flink on Mesos as is.
>>
>> As for the jira issues, I believe the most essential ones should have
>> already been resolved. You may find some remaining open issues here [1],
>> but not all of them are necessary if we decide to keep Flink on Mesos as is.
>>
>> At the moment and in the short future, I think helps are mostly needed on
>> testing the upcoming release 1.12 with Mesos use cases. The community is
>> currently actively preparing the new release, and hopefully we could come
>> up with a release candidate early next month. It would be greatly
>> appreciated if you fork as experienced Flink on Mesos users can help with
>> verifying the release candidates.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
>> <
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open&data=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D&reserved=0
>> >
>>
>> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang > <mailto:p.nar...@criteo.com>> wrote:
>>
>> Hi Xintong,
>>
>>
>>
>> Do you have any jiras that cover any of the items on 1 or 2? I can reach
>> out to folks internally and see if I can get some folks to commit to
>> helping out.
>>
>>
>>
>> To cover the other qs:
>>
>>   *   Yes, we’ve not got a plan at the moment to get off Mesos. We use
>> Yarn for some our Flink workloads when we can. Mesos is only used when we
>> need streaming capabilities in our WW dcs (as our Yarn is centralized in
&g

Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-26 Thread Matthias Pohl
Thanks for double-checking Dawid and thanks for clarifying, Jark. I will
leave the Jira issue open as Jark suggested improving the documentation in
that sense.

Best,
Matthias

On Fri, Mar 26, 2021 at 7:43 AM Jark Wu  wrote:

> IIUC, pipeline.auto-watermak-interval = 0 just disable **periodic**
> watermark emission,
>  it doesn't mean the watermark will never be emitted.
> In Table API/SQL, it has the same meaning. If watermark interval = 0, we
> disable periodic watermark emission,
> and emit watermark once it advances.
>
> So I think the SQL documentation is correct.
>
> Best,
> Jark
>
> On Tue, 23 Mar 2021 at 22:29, Dawid Wysakowicz 
> wrote:
>
>> Hey,
>>
>> I would like to double check this with Jark and/or Timo. As far as
>> DataStream is concerned the javadoc is correct. Moreover the
>> pipeline.auto-watermak-interval and setAutoWatermarkInterval are
>> effectively the same setting/option. However I am not sure if Table API
>> interprets it in the same way as DataStream APi. The documentation you
>> linked, Aeden, describes the SQL API.
>>
>> @Jark @Timo Could you verify if the SQL documentation is correct?
>>
>> Best,
>>
>> Dawid
>> On 23/03/2021 15:20, Matthias Pohl wrote:
>>
>> Hi Aeden,
>> sorry for the late reply. I looked through the code and verified that the
>> JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0 will
>> disable the automatic watermark generation. I created FLINK-21931 [1] to
>> cover this.
>>
>> Thanks,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21931
>>
>> On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson 
>> wrote:
>>
>>> Correction: The first link was supposed to be,
>>>
>>> 1. pipeline.auto-watermark-interval
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
>>>
>>> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson 
>>> wrote:
>>> >
>>> > I'm hoping to have my confusion clarified regarding the settings,
>>> >
>>> > 1. pipeline.auto-watermark-interval
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>>> >
>>> > 2. setAutoWatermarkInterval
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>>> >
>>> > I noticed the default value of pipeline.auto-watermark-interval is 0
>>> > and according to these docs,
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
>>> ,
>>> > it states, "If watermark interval is 0ms, the generated watermarks
>>> > will be emitted per-record if it is not null and greater than the last
>>> > emitted one." However in the documentation for
>>> > setAutoWatermarkInterval the value 0 disables watermark emission.
>>> >
>>> > * Are they intended to be the same setting? If not how are they
>>> > different? Is one for FlinkSql and the other DataStream API?
>>> >
>>> > --
>>> > Thank you,
>>> > Aeden
>>
>>


Re: Hadoop is not in the classpath/dependencies

2021-03-30 Thread Matthias Seiler
Thank you all for the replies!


I did as @Maminspapin suggested and indeed the previous error
disappeared, but now the exception is
```
java.io.IOException: Cannot instantiate file system for URI:
hdfs://node-1:9000/flink
//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide
interval of 30 seconds, but removing it displays the same error.

I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber
but with this I can't even start a cluster (`TaskManager initialization
failed`).



@Robert, Flink includes roughly 100 hdfs jars.
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain
`DistributedFileSystem.class`, which I checked running `jar tvf
hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep
DistributedFileSystem`. How can I verify that the class is really
accessible?

Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:
> Hey Matthias,
>
> Maybe the classpath contains hadoop libraries, but not the HDFS
> libraries? The "DistributedFileSystem" class needs to be accessible to
> the classloader. Can you check if that class is available?
>
> Best,
> Robert
>
> On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler
>  <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>
> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
> machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>.
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>


Re: Fw:A question about flink watermark illustration in official documents

2021-03-31 Thread Matthias Pohl
Hi 罗昊,
the 2nd picture is meant to visualize the issue of out-of-orderness in
general. I'd say it's not referring to a specific strategy.

But one way to interpret the image is using the BoundedOutOfOrderness
strategy for watermark generation [1]: You can define an upper bound B for
the out-of-orderness. The watermark generator assumes that there's a delay
of B, i.e. for an event with timestamp T, no events older than {@code T -
B} will follow any more. The delayed watermarks you see in image 2 could be
achieved using this bounded out-of-orderness strategy.

The usage of watermark strategies is also addressed in the docs [2].

I hope this helps.
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java#L37
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#generating-watermarks

On Tue, Mar 30, 2021 at 6:26 AM 罗昊  wrote:

> Recently I read flink official documents for something about watermarks。
> url:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
> there are two pictures illustrating flink watermark mechanism, which
> puzzle me mush:
>
>
> The first picture is easy to understand, But in the second, I wonder how
> do we get w(11) and w(17)?
> As we know, we can define how to generate watermark in the flink job, in
> other words, watermarks are generated by certain rules. So what are the
> rules that the watermarks are generated in the second pic.
>
> I look up for almost  all offficial documents of different version flink
> and they use the same pictures.
> It puzzled me much。Is there any explaination?
> waiting for your answers ,Thx!
>


Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs
about the DeserializationSchema [1]? It
mentions ConfluentRegistryAvroDeserializationSchema. Is this something
you're looking for?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema

On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:

> I tried this:
>
> 1. Schema (found in stackoverflow)
>
> class GenericRecordSchema implements
> KafkaDeserializationSchema {
>
> private String registryUrl;
> private transient KafkaAvroDeserializer deserializer;
>
> public GenericRecordSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public GenericRecord deserialize(ConsumerRecord
> consumerRecord) throws Exception {
> checkInitialized();
> return (GenericRecord)
> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (deserializer == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> deserializer = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
> 2. Consumer
>
> private static FlinkKafkaConsumer getConsumer(String topic)
> {
>
> return new FlinkKafkaConsumer<>(
> topic,
> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
> getConsumerProperties());
> }
>
> But when I start the app, the following error is happen:
>
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(Abs

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Ok, it looks like you've found that solution already based on your question
in [1].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl 
wrote:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChaining

Re: Proper way to get DataStream

2021-03-31 Thread Matthias Pohl
Hi Maminspapin again,
have you checked whether your topic actually contains data that matches
your schema specified through cep.model.User?

Best,
Matthias

On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:

> Hi,
>
> I'm trying to solve a task with getting data from topic. This topic keeps
> avro format data.
>
> I wrote next code:
>
>  public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Schema schema = ReflectData.get().getSchema(User.class);
> FlinkKafkaConsumer userConsumer = new
> FlinkKafkaConsumer<>(
>"test_topic",
> *// First*
> AvroDeserializationSchema.forGeneric(schema),
> *// Second*
> //
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";),
> getConsumerProperties());
>
> DataStream userStream =
> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
> userStream.print("users");
>
> env.execute();
> }
>
> So, as I think right, there are two ways to get the result:
> 1. AvroDeserializationSchema.forGeneric(schema)
> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";)
>
> And I use ReflectData.get().getSchema(User.class) to get schema.
>
>
> Please, Flink guru, tell me if I am on the right way or not.
>
>
> If I use First way, there is next error:
>
> java.io.EOFException
> at org.apache.avro.io
> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
> at org.apache.avro.io
> .BinaryDecoder.readInt(BinaryDecoder.java:150)
> at org.apache.avro.io
> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>
> If I use Second way, there is next error:
>
> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
> expecting cep.model.User, missing required field userId
> at org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>
> How can I get the correct result?
>
> Sorry, if duplicated:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>
> Today is third day I'm working with this issue (((
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Matthias Pohl
Hi Claude,
thanks for reaching out to the Flink community. Could you provide the Flink
logs for this run to get a better understanding of what's going on?
Additionally, what exact Flink 1.12 version are you using? Did you also
verify that the snapshot was created by checking the actual folder?

Best,
Matthias

On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:

> Hello,
>
> I have Flink setup as an Application Cluster in Kubernetes, using Flink
> version 1.12.  I created a savepoint using the curl command and the status
> indicated it was completed.  I then tried to relaunch the job from that
> save point using the following arguments as indicated in the doc found
> here:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>
> args: ["standalone-job", "--job-classname", "", "--job-id",
> "", "--fromSavepoint", "s3:///",
> "--allowNonRestoredState"]
>
> After the job launches, I check the offsets and they are not the same as
> when the savepoint was created.  The job id passed in also does not match
> the job id that was launched.  I even put an incorrect savepoint path to
> see what happens and there were no errors in the logs and the job still
> launches.  It seems these arguments are not even being evaluated.  Any
> ideas about this?
>
>
> Thanks
>


Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
Hi Deepthi,
1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
might be helpful.
2. Unfortunately, Flink doesn't provide metrics like that. But you might
want to follow FLINK-21736 [2] for future developments.
3. Is there anything specific you are looking for? Unfortunately, I don't
know any blogs for a more detailed summary. If you plan to look into the
code CheckpointCoordinator [3] might be a starting point. Alternatively,
something like MetadataV2V3SerializerBase [4] offers insights into how the
checkpoints' metadata is serialized.

Best,
Matthias

[1] https://github.com/apache/flink-benchmarks
[2] https://issues.apache.org/jira/browse/FLINK-21736
[3]
https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
[4]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83

On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
deepthi.sridha...@gmail.com> wrote:

> Hi,
>
> I am trying to set up some benchmarking with a couple of IO options for
> saving checkpoints and have a couple of questions :
>
> 1. Does flink come with any IO benchmarking tools? I couldn't find any. I
> was hoping to use those to derive some insights about the storage
> performance and extrapolate it for the checkpoint use case.
>
> 2. Are there any metrics pertaining to restore from checkpoints? The only
> metric I can find is the last restore time, but neither the time it took to
> read the checkpoints, nor the time it took to restore the operator/task
> states seem to be covered. I am using RocksDB, but couldn't find any
> metrics relating to how much time it took to restore the state backend from
> rocksdb either.
>
> 3. I am trying to find documentation on how the states are serialized into
> the checkpoint files from multiple operators and tasks to tailor the
> testing use case, but can't seem to find any. Are there any bogs that go
> into this detail or would reading the code be the only option?
>
> --
> Thanks,
> Deepthi
>


Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
For 2. there are also efforts to expose the state and operator
initialization through the logs (see FLINK-17012 [1]).
For 3. the TypeSerializer [2] might be another point of interest. It is
used to serialize specific types. Other than that, the state
serialzation depends heavily on the used state backend. Hence, you want to
look into RocksDB's SSTables if relying on it as a state backend.

[1] https://issues.apache.org/jira/browse/FLINK-17012
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java

On Thu, Apr 1, 2021 at 1:27 AM deepthi Sridharan <
deepthi.sridha...@gmail.com> wrote:

> Thanks, Matthias. This is very helpful.
>
> Regarding the checkpoint documentation, I was mostly looking for
> information on how states from various tasks get serialized into one (or
> more?) files on persistent storage. I'll check out the code pointers!
>
> On Wed, Mar 31, 2021 at 7:07 AM Matthias Pohl 
> wrote:
>
>> Hi Deepthi,
>> 1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
>> might be helpful.
>> 2. Unfortunately, Flink doesn't provide metrics like that. But you might
>> want to follow FLINK-21736 [2] for future developments.
>> 3. Is there anything specific you are looking for? Unfortunately, I don't
>> know any blogs for a more detailed summary. If you plan to look into the
>> code CheckpointCoordinator [3] might be a starting point. Alternatively,
>> something like MetadataV2V3SerializerBase [4] offers insights into how the
>> checkpoints' metadata is serialized.
>>
>> Best,
>> Matthias
>>
>> [1] https://github.com/apache/flink-benchmarks
>> [2] https://issues.apache.org/jira/browse/FLINK-21736
>> [3]
>> https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
>> [4]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>
>> On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
>> deepthi.sridha...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to set up some benchmarking with a couple of IO options for
>>> saving checkpoints and have a couple of questions :
>>>
>>> 1. Does flink come with any IO benchmarking tools? I couldn't find
>>> any. I was hoping to use those to derive some insights about the storage
>>> performance and extrapolate it for the checkpoint use case.
>>>
>>> 2. Are there any metrics pertaining to restore from checkpoints? The
>>> only metric I can find is the last restore time, but neither the time it
>>> took to read the checkpoints, nor the time it took to restore the
>>> operator/task states seem to be covered. I am using RocksDB, but couldn't
>>> find any metrics relating to how much time it took to restore the state
>>> backend from rocksdb either.
>>>
>>> 3. I am trying to find documentation on how the states are serialized
>>> into the checkpoint files from multiple operators and tasks to tailor the
>>> testing use case, but can't seem to find any. Are there any bogs that go
>>> into this detail or would reading the code be the only option?
>>>
>>> --
>>> Thanks,
>>> Deepthi
>>>
>>
>
> --
> Regards,
> Deepthi
>


Re: JDBC connector support for JSON

2021-04-01 Thread Matthias Pohl
Hi Fanbin,
I'm not that familiar with the FlinkSQL features. But it looks like the
JdbcConnector does not support Json as stated in the documentation [1]. You
might work around it by implementing your own user-defined functions [2].

I hope this helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html

On Wed, Mar 31, 2021 at 7:04 AM Fanbin Bu  wrote:

> Hi,
>
> For a streaming job that uses Kafka connector, this doc
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options
> shows that we can parse json data format. However, it does not seem
> like Flink JDBC connector support json data type, at least from this doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping
> .
>
> So the question is, does JDBC connector also have this capability? if not,
> what's required to enable it. At the end of the day, I would like to see
> something like this:
>
> create table aTable(field1 type, jsonField1 ROW ROW >)
> with
> (
> 'connector' = 'jdbc',
> 'url' = '...',
> 'table-name' = 'my-table-with-json-column',
> ...
> )
>
> tEnv.executeSql("select jsonField1.jsonField2.field3 from aTable")
>
> Thanks,
> Fanbin
>


Re: Restoring from Flink Savepoint in Kubernetes not working

2021-04-01 Thread Matthias Pohl
The logs would have helped to understand better what you were doing.

The stacktrace you shared indicates that you either asked for the status of
a savepoint creation that had already been completed and was, therefore,
removed from the operations cache or you used some job ID/request ID
pair that was not connected with any savepoint creation operation.
The operations are only cached for 300 seconds before being removed from
the cache. You could verify that the specific operation did expire and was
removed from the cache in the logs [1] stating something like: "Evicted
result with trigger id {} because its TTL of {}s has expired."

But you should be also able to verify the completion of the savepoint in
the logs.

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java#L104

On Wed, Mar 31, 2021 at 4:46 PM Claude M  wrote:

> Thanks for your reply.  I'm using the flink docker
> image flink:1.12.2-scala_2.11-java8.  Yes, the folder was created in S3.  I
> took a look at the UI and it showed the following:
>
> *Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type:
> CheckpointPath:
> s3:fcc82deebb4565f31a7f63989939c463/chk-49*
>
> However, this is different from the savepoint path I specified.  I
> specified the following:
>
> *s3:savepoint2/savepoint-9fe457-504c312ffabe*
>
> Is there anything specific you're looking for in the logs?  I did not find
> any exceptions and there is a lot of sensitive information I would have to
> extract from it.
>
> Also, this morning, I tried creating another savepoint.  It first showed
> it was In Progress.
>
> curl 
> http://localhost:8081/jobs/fcc82deebb4565f31a7f63989939c463/savepoints/4d19307dd99337257c4738871b1c63d8
> {"status":{"id":"IN_PROGRESS"},"operation":null}
>
> Then later when I tried to check the status, I saw the attached
> exception.
>
> In the UI, I see the following:
>
> *Latest Failed Checkpoint ID: 50Failure Time: 2021-03-31 09:34:43Cause:
> Asynchronous task checkpoint failed.*
>
> What does this failure mean?
>
>
> On Wed, Mar 31, 2021 at 9:22 AM Matthias Pohl 
> wrote:
>
>> Hi Claude,
>> thanks for reaching out to the Flink community. Could you provide the
>> Flink logs for this run to get a better understanding of what's going on?
>> Additionally, what exact Flink 1.12 version are you using? Did you also
>> verify that the snapshot was created by checking the actual folder?
>>
>> Best,
>> Matthias
>>
>> On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:
>>
>>> Hello,
>>>
>>> I have Flink setup as an Application Cluster in Kubernetes, using Flink
>>> version 1.12.  I created a savepoint using the curl command and the status
>>> indicated it was completed.  I then tried to relaunch the job from that
>>> save point using the following arguments as indicated in the doc found
>>> here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>>>
>>> args: ["standalone-job", "--job-classname", "", "--job-id",
>>> "", "--fromSavepoint", "s3:///",
>>> "--allowNonRestoredState"]
>>>
>>> After the job launches, I check the offsets and they are not the same as
>>> when the savepoint was created.  The job id passed in also does not match
>>> the job id that was launched.  I even put an incorrect savepoint path to
>>> see what happens and there were no errors in the logs and the job still
>>> launches.  It seems these arguments are not even being evaluated.  Any
>>> ideas about this?
>>>
>>>
>>> Thanks
>>>
>>


Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-04-14 Thread Matthias Pohl
Thanks for everyone's feedback. I'm gonna initiate a vote in a separate
thread.

On Mon, Mar 29, 2021 at 9:18 AM Robert Metzger  wrote:

> +1
>
>
>
> On Mon, Mar 29, 2021 at 5:44 AM Yangze Guo  wrote:
>
> > +1
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Mar 29, 2021 at 11:31 AM Xintong Song 
> > wrote:
> > >
> > > +1
> > > It's already a matter of fact for a while that we no longer port new
> > features to the Mesos deployment.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Fri, Mar 26, 2021 at 10:37 PM Till Rohrmann 
> > wrote:
> > >>
> > >> +1 for officially deprecating this component for the 1.13 release.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf 
> > wrote:
> > >>>
> > >>> Hi Matthias,
> > >>>
> > >>> Thank you for following up on this. +1 to officially deprecate Mesos
> > in the code and documentation, too. It will be confusing for users if
> this
> > diverges from the roadmap.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Konstantin
> > >>>
> > >>> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl <
> matth...@ververica.com>
> > wrote:
> > >>>>
> > >>>> Hi everyone,
> > >>>> considering the upcoming release of Flink 1.13, I wanted to revive
> the
> > >>>> discussion about the Mesos support ones more. Mesos is also already
> > listed
> > >>>> as deprecated in Flink's overall roadmap [1]. Maybe, it's time to
> > align the
> > >>>> documentation accordingly to make it more explicit?
> > >>>>
> > >>>> What do you think?
> > >>>>
> > >>>> Best,
> > >>>> Matthias
> > >>>>
> > >>>> [1] https://flink.apache.org/roadmap.html#feature-radar
> > >>>>
> > >>>> On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann  >
> > wrote:
> > >>>>
> > >>>> > Hi Oleksandr,
> > >>>> >
> > >>>> > yes you are right. The biggest problem is at the moment the lack
> of
> > test
> > >>>> > coverage and thereby confidence to make changes. We have some e2e
> > tests
> > >>>> > which you can find here [1]. These tests are, however, quite
> coarse
> > grained
> > >>>> > and are missing a lot of cases. One idea would be to add a Mesos
> > e2e test
> > >>>> > based on Flink's end-to-end test framework [2]. I think what needs
> > to be
> > >>>> > done there is to add a Mesos resource and a way to submit jobs to
> a
> > Mesos
> > >>>> > cluster to write e2e tests.
> > >>>> >
> > >>>> > [1] https://github.com/apache/flink/tree/master/flink-jepsen
> > >>>> > [2]
> > >>>> >
> >
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
> > >>>> >
> > >>>> > Cheers,
> > >>>> > Till
> > >>>> >
> > >>>> > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
> > >>>> > o.nitavs...@criteo.com> wrote:
> > >>>> >
> > >>>> >> Hello Xintong,
> > >>>> >>
> > >>>> >> Thanks for the insights and support.
> > >>>> >>
> > >>>> >> Browsing the Mesos backlog and didn't identify anything critical,
> > which
> > >>>> >> is left there.
> > >>>> >>
> > >>>> >> I see that there are were quite a lot of contributions to the
> > Flink Mesos
> > >>>> >> in the recent version:
> > >>>> >> https://github.com/apache/flink/commits/master/flink-mesos.
> > >>>> >> We plan to validate the current Flink master (or release 1.12
> > branch) our
> > >>>> >> Mesos setup. In case of any issues, we will try to propose
> changes.
> > >>>> >> My feeling is that our test results shouldn't affect 

Re: Re: Questions of "State Processing API in Scala"

2020-09-01 Thread Matthias Pohl
Hi Izual,
thanks for contributing and improving the documentation. The PR will be
picked up as part of our regular maintenance work. The communication will
happen through PR conversations as soon as someone picks it up.

Best,
Matthias

On Tue, Sep 1, 2020 at 8:44 AM izual  wrote:

> I tried to fix the small mistake of sample code in State-Processor-API
> doc[1], could someone do a doc review[2] for me, thank you.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
> 2: https://github.com/apache/flink/pull/13266
>
>
>
> At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai"  wrote:
> >Hi Izual,
> >
> >Thanks for reporting this! I'm also forwarding this to the user mailing
> >list, as that is the more suitable place for this question.
> >
> >I think the usability of the State Processor API in Scala is indeed
> >something that hasn’t been looked at closely yet.
> >
> >On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:
> >
> >> Hi community,
> >>
> >> When I use state in Scala, something makes confused, I followed these
> >> steps to generate and read states:
> >>
> >> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> >> and run jobA => that makes good.
> >>
> >> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> >> expected.
> >>
> >> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> >> below), and run jobB => failed, exceptions shows that "Caused by:
> >> org.apache.flink.util.StateMigrationException: The new key serializer must
> >> be compatible."
> >>
> >>
> >> ReaderFunction code as below:
> >>
> >> ```
> >>
> >>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> >> Long)] {
> >>
> >> var countState: ValueState[(Long, Long)] = _
> >>
> >> override def open(parameters: Configuration): Unit = {
> >>
> >>   val stateDescriptor = new ValueStateDescriptor("average",
> >> createTypeInformation[(Long, Long)])
> >>
> >>   countState = getRuntimeContext().getState(stateDescriptor)
> >>
> >> }
> >>
> >> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> >> out: Collector[(Long, Long)]): Unit = {
> >>
> >>   out.collect(countState.value())
> >>
> >> }
> >>
> >>   }
> >>
> >> ```
> >>
> >> d. then I try to use java.lang.Long instead of Long in key-type, and run
> >> jobB => exception just disappeared and that makes good.
> >>
> >> This makes me confused. Did I miss some features in State-Processing-API,
> >> such as `magic-implicits`?
> >>
> >
> >This part is explainable. The "magic-implicits" actually happen in the
> >DataStream Scala API.
> >Any primitive Scala types will inferred and serialized as their Java
> >counterparts.
> >AFAIK, this would not happen in the State Processor API yet and therefore
> >why you are getting the StateMigrationException.
> >When using Scala types directly with the State Processor API, I would guess
> >that Kryo (as a generic fallback) was being used to access state.
> >This can probably be confirmed by looking at the exception stack trace. Can
> >you post a full copy of that?
> >
> >This should be resolvable by properly supporting Scala for the State
> >Processor API, but it's just that up to this point, we didn't have a plan
> >for that yet.
> >Can you open a JIRA for this? I think it'll be a reasonable extension to
> >the API.
> >
> >
> >>
> >> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> >> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> >> does not work.
> >>
> >
> >I'm not sure what you mean here. Where is this keyBy happening? In the
> >Scala DataStream job, or the State Processor API?
> >
> >
> >>
> >> Hope.
> >>
> >> 1:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> >>
> >> 2:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> >
> >
> >Cheers,
> >Gordon
>
>
>
>
>


-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: How can I drop events which are late by more than X hours/days?

2020-09-24 Thread Matthias Pohl
Hi Ori,
one way to do it is to implement a basic ProcessFunction.
ProcessFunction.processElement(I
value, Context ctx, Collector out) offers access to the context through
which you can access the current watermark timestamp using
ctx.timerService().currentWatermark(). That you can use to filter out
delayed events.

Best,
Matthias

On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski  wrote:

> I need to drop elements which are delayed by more than a certain amount of
> time from the current watermark.
>
> I wanted to create a FilterFunction where I get the current watermark,
> and if the difference between the watermark and my element's timestamp is
> greater than X - drop the element.
>
> However, I do not have access to the current watermark inside any of
> Flink's operators/functions including FilterFunction.
>
> How can such functionality be achieved?
>


-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: global state and single stream

2020-09-24 Thread Matthias Pohl
Hi Adam,
sorry for the late reply. Introducing a global state is something that
should be avoided as it introduces bottlenecks and/or concurrency/order
issues. Broadcasting the state between different subtasks will also bring a
loss in performance since each state change has to be shared with every
other subtask. Ideally, you might be able to reconsider the design of your
pipeline.

Is there a specific reason that prevents you from doing the merging on a
single instance?

Best,
Matthias

On Wed, Sep 16, 2020 at 11:21 PM Adam Atrea  wrote:

>
> Hi,
>
> I am pretty new to Flink and I'm trying to implement - which seems to me -
> a pretty basic pattern:
>
> Say I have a single stream of *Price *objects encapsulating a price value
> and a symbol (for example A to Z)  they are emitted at a very random
> interval all day - could be 1 /day or once a week.
>
> *..(price= .22, symbol = 'B')...(price= .12 , symbol = 'C').. (price= .12
> , symbol = 'A'). .(price= .22, symbol = 'Z')...*
>
> I want to define an operator that should emit a single object only once
> when ALL symbols have been received (not before) - the object will include
> all the received prices.
> If a price is received again for the same symbol it will reemit the same
> object with the updated price and all previous prices will stay the same.
>
> If the stream is keyed by symbol I understand that using MapState state
> will not help because the state is local to the partition - each task will
> need to know that all symbols have been received.
>
> Basically I'm looking for a global state for a single keyed stream - a
> state accessible by all parallel tasks - I have used the broadcast pattern
> but I understand this is when connecting 2 streams - is there a way to do
> it without forcing parallelism to 1.
>
> Thanks in advance for your assistance
>
> Adam
>
>
>


Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Matthias Pohl
Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling
currently implemented for SerializationSchema. FLIP-124 [1] addressed this
for the DeserializationSchema, though. I created FLINK-19397 [2] to cover
this feature.

In the meantime, I cannot think of any other solution than filtering those
rows out in a step before emitting the data to Kafka.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov  wrote:

> Hi,
>
> I'm using a custom KafkaSerializationSchema to write records to Kafka
> using FlinkKafkaProducer. The objects written are Rows coming from Flink's
> SQL API.
>
> In some cases, when trying to convert the Row object to a byte[],
> serialization will fail due to malformed values. In such cases, I would
> like the custom serialization schema to drop the bad records and not send
> them through.
>
> From the API, it is unclear how such failures should be handled. Given the
> following signature:
>
>  ProducerRecord serialize(T element, @Nullable Long
> timestamp);
>
> From reading the code, there's no exception handling or null checking,
> which means that:
>
> - If an exception is thrown, it will cause the entire job to fail (this
> has happened to me in production)
> - If null is passed, a null value will be pushed down to
> kafkaProducer.send which is undesirable.
>
> What are the options here?
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


  1   2   3   4   5   >