Kafka Connector Topic Discovery

2021-06-10 Thread Martin, Nick J [US] (SP)
I'm trying to use the topic discovery feature of the Kafka Connector. The 
problem I'm having is that Kafka Consumers fail to start if there are no topics 
matching the topic regex when they start up. Is this intended behavior? Is 
there some other property I could set to just continue discovery until they 
find a matching topic?

Background:
My application uses dynamically generated topic names where specific messages 
are sent on different topics based on some metadata in the messages. A producer 
service reads the metadata, determines the topic the data should be sent to, 
applies some validation logic to the topic name, creates that topic if it 
doesn't already exist, and then sends the data. The problem is, when starting 
the whole stack with a fresh Kafka cluster, my Flink job with the Kafka 
consumer can't start until the producer service has been started and has 
processed some data so that at least one matching topic exists. This sort of 
startup order dependency is obviously undesirable in a distributed microservice 
architecture.

Are there existing features/configuration settings that solve this problem? 
Should I open a ticket?


RE: EXT :Flink solution for having shared variable between task managers

2020-01-17 Thread Martin, Nick J [US] (IS)
I think you’re looking for Broadcast State. Here’s a detailed guide.

https://flink.apache.org/2019/06/26/broadcast-state.html

From: Soheil Pourbafrani [mailto:soheil.i...@gmail.com]
Sent: Friday, January 17, 2020 6:50 AM
To: user 
Subject: EXT :Flink solution for having shared variable between task managers

Hi,

According to the processing logic, I need to have a HashMap variable that 
should be shared between the taskmanagers. The scenario is the HashMap data 
will be continuously updated according to the incoming stream of data.

What I observed is declaring the HashMap variable as a class attribute, it will 
be shared among a single taskmanagers slots, but in case I have multiple 
taskmanager, each will have a separate HashMap instance.

What is the standard way to achieve this? Does Flink provide any utility for 
that?


RE: EXT :Re: Taskmanagers in Docker Fail to Resolve Own Hostnames and Won't Accept Tasks

2020-01-06 Thread Martin, Nick J [US] (IS)
Yes, the container seems to be resolving its own host name correctly (the Flink 
docker image doesn’t come with nslookup installed, but pinging by host name 
worked). When I did the check, it had been a considerable time since the 
container started, so I can’t rule out a race condition between flink startup 
and container hostname assignment.

Another weird thing I noticed is that the IP being reported by the Jobmanager 
in place of the host name isn’t for an individual container. Instead, it’s the 
virtual IP for the whole task manager service. Which seems strange, since that 
hostname that points to the taskmanager service isn’t something I put in 
Flink’s config files anywhere, and I don’t think containers should be referring 
to themselves by that name.

From: Yang Wang [mailto:danrtsey...@gmail.com]
Sent: Sunday, December 22, 2019 7:15 PM
To: Martin, Nick J [US] (IS) 
Cc: user 
Subject: EXT :Re: Taskmanagers in Docker Fail to Resolve Own Hostnames and 
Won't Accept Tasks

Hi Martin,

Could you `docker exec` into the problematic taskmanager and check whether the 
hostname could
be resolved to a correct ip? You could use `nslookup {tm_hostname}` to verify.


Best,
Yang

Martin, Nick J [US] (IS) mailto:nick.mar...@ngc.com>> 
于2019年12月21日周六 上午6:07写道:
I’m running Flink 1.7.2 in a Docker swarm. Intermittently, new task managers 
will fail to resolve their own host names when starting up. In the log I see 
“no hostname could be resolved” messages coming from TaskManagerLocation. The 
webUI on the jobmanager shows the taskmanagers as are associated/connected with 
the jobmanager, but their akka paths show their IP, rather than the container 
name that ‘good’ taskmanager show. Those taskmanagers that are listed by IP 
give ‘failed to connect’ errors when new jobs are started that try to use those 
taskmanagers, and that job eventually fails. But the taskmanagers with this 
condition still give regular heartbeats to the Jobmanager, so the jobmanager 
keeps trying to assign work to them. Does anyone know what’s going on here?


Taskmanagers in Docker Fail to Resolve Own Hostnames and Won't Accept Tasks

2019-12-20 Thread Martin, Nick J [US] (IS)
I'm running Flink 1.7.2 in a Docker swarm. Intermittently, new task managers 
will fail to resolve their own host names when starting up. In the log I see 
"no hostname could be resolved" messages coming from TaskManagerLocation. The 
webUI on the jobmanager shows the taskmanagers as are associated/connected with 
the jobmanager, but their akka paths show their IP, rather than the container 
name that 'good' taskmanager show. Those taskmanagers that are listed by IP 
give 'failed to connect' errors when new jobs are started that try to use those 
taskmanagers, and that job eventually fails. But the taskmanagers with this 
condition still give regular heartbeats to the Jobmanager, so the jobmanager 
keeps trying to assign work to them. Does anyone know what's going on here?


RE: EXT :Re: Jar Uploads in High Availability (Flink 1.7.2)

2019-10-21 Thread Martin, Nick J [US] (IS)
So I think what you’re saying is if I use a DFS for web.upload.dir, my clients 
can send all their requests to any Job Manager instance and not worry or care 
which one is the leader. That definitely is an improvement, thanks.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Friday, October 18, 2019 6:42 AM
To: Martin, Nick J [US] (IS) 
Cc: Ravi Bhushan Ratnakar ; user 

Subject: Re: EXT :Re: Jar Uploads in High Availability (Flink 1.7.2)

Hi Martin,

Flink's web UI based job submission is not well suited to be run behind a load 
balancer at the moment. The problem is that the web based job submission is 
actually a two phase operation: Uploading the jars and then starting the job. 
Since Flink's RestServer stores the uploaded files locally, it is required that 
the web submission is executed on the same RestServer to which you also 
uploaded the files before. Note, however, that the cli client job submission is 
not affected by this since the job graph upload and submission is one request.

A workaround to make the uploads accessible to all RestServers is to configure 
a DFS for the `web.upload.dir` as Ravi suggested or to use Flink's CLI to 
submit jobs instead.

A quick note about the old behaviour with the redirects. The redirects actually 
defied the purpose of load balancers because all requests were redirected to a 
single RestServer instance. Hence, running it with or w/o load balancer should 
not have made a big difference.

Cheers,
Till

On Wed, Oct 16, 2019 at 5:58 PM Martin, Nick J [US] (IS) 
mailto:nick.mar...@ngc.com>> wrote:
Yeah, I’ll do that if I have to. I’m hoping there’s a ‘right’ way to do it 
that’s easier. If I have to implement the zookeeper lookups in my load balancer 
myself, that feels like a definite step backwards from the pre-1.5 days when 
the cluster would give 307 redirects to the current leader

From: Ravi Bhushan Ratnakar 
[mailto:ravibhushanratna...@gmail.com<mailto:ravibhushanratna...@gmail.com>]
Sent: Tuesday, October 15, 2019 10:35 PM
To: Martin, Nick J [US] (IS) mailto:nick.mar...@ngc.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: EXT :Re: Jar Uploads in High Availability (Flink 1.7.2)

Hi,

i was also experiencing with the similar behavior. I adopted following approach

  *used a distributed file system(in my case aws efs) and set the attribute 
"web.upload.dir", this way both the job manager have same location.
  *   on the load balancer side(aws elb), i used "readiness probe" based on 
zookeeper entry for active jobmanager address, this way elb always point to the 
active job manager and if the active jobmanager changes then it automatically 
point to the new active jobmanager and as both are using the same location by 
configuring distributed file system so new active job is able to find the same 
jar.

Regards,
Ravi

On Wed, Oct 16, 2019 at 1:15 AM Martin, Nick J [US] (IS) 
mailto:nick.mar...@ngc.com>> wrote:
I’m seeing that when I upload a jar through the rest API, it looks like only 
the Jobmanager that received the upload request is aware of the newly uploaded 
jar. That worked fine for me in older versions where all clients were 
redirected to connect to the leader, but now that each Jobmanager accepts 
requests, if I send a jar upload request, it could end up on any one (and only 
one) of the Jobmanagers, not necessarily the leader. Further, each Jobmanager 
responds to a GET request on the /jars endpoint with its own local list of 
jars. If I try and use one of the Jar IDs from that request, my next request 
may not go to the same Jobmanager (requests are going through Docker and being 
load-balanced), and so the Jar ID isn’t found on the new Jobmanager handling 
that request.






RE: EXT :Re: Jar Uploads in High Availability (Flink 1.7.2)

2019-10-16 Thread Martin, Nick J [US] (IS)
Yeah, I’ll do that if I have to. I’m hoping there’s a ‘right’ way to do it 
that’s easier. If I have to implement the zookeeper lookups in my load balancer 
myself, that feels like a definite step backwards from the pre-1.5 days when 
the cluster would give 307 redirects to the current leader

From: Ravi Bhushan Ratnakar [mailto:ravibhushanratna...@gmail.com]
Sent: Tuesday, October 15, 2019 10:35 PM
To: Martin, Nick J [US] (IS) 
Cc: user 
Subject: EXT :Re: Jar Uploads in High Availability (Flink 1.7.2)

Hi,

i was also experiencing with the similar behavior. I adopted following approach

  *used a distributed file system(in my case aws efs) and set the attribute 
"web.upload.dir", this way both the job manager have same location.
  *   on the load balancer side(aws elb), i used "readiness probe" based on 
zookeeper entry for active jobmanager address, this way elb always point to the 
active job manager and if the active jobmanager changes then it automatically 
point to the new active jobmanager and as both are using the same location by 
configuring distributed file system so new active job is able to find the same 
jar.

Regards,
Ravi

On Wed, Oct 16, 2019 at 1:15 AM Martin, Nick J [US] (IS) 
mailto:nick.mar...@ngc.com>> wrote:
I’m seeing that when I upload a jar through the rest API, it looks like only 
the Jobmanager that received the upload request is aware of the newly uploaded 
jar. That worked fine for me in older versions where all clients were 
redirected to connect to the leader, but now that each Jobmanager accepts 
requests, if I send a jar upload request, it could end up on any one (and only 
one) of the Jobmanagers, not necessarily the leader. Further, each Jobmanager 
responds to a GET request on the /jars endpoint with its own local list of 
jars. If I try and use one of the Jar IDs from that request, my next request 
may not go to the same Jobmanager (requests are going through Docker and being 
load-balanced), and so the Jar ID isn’t found on the new Jobmanager handling 
that request.






Jar Uploads in High Availability (Flink 1.7.2)

2019-10-15 Thread Martin, Nick J [US] (IS)
I'm seeing that when I upload a jar through the rest API, it looks like only 
the Jobmanager that received the upload request is aware of the newly uploaded 
jar. That worked fine for me in older versions where all clients were 
redirected to connect to the leader, but now that each Jobmanager accepts 
requests, if I send a jar upload request, it could end up on any one (and only 
one) of the Jobmanagers, not necessarily the leader. Further, each Jobmanager 
responds to a GET request on the /jars endpoint with its own local list of 
jars. If I try and use one of the Jar IDs from that request, my next request 
may not go to the same Jobmanager (requests are going through Docker and being 
load-balanced), and so the Jar ID isn't found on the new Jobmanager handling 
that request.






RE: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-18 Thread Martin, Nick
Jobmanager.sh takes an optional argument for the hostname to bind to, and 
start-cluster uses it. If you leave it blank it, the script will use whatever 
is in flink-conf.yaml (localhost is the default value that ships with flink).

The dockerized version of flink runs pretty much the way you’re trying to 
operate (i.e. each node starts itself), so the entrypoint script out of that is 
probably a good source of information about how to set it up.

From: PoolakkalMukkath, Shakir [mailto:shakir_poolakkalmukk...@comcast.com]
Sent: Tuesday, June 18, 2019 2:15 PM
To: Till Rohrmann ; John Smith 
Cc: user 
Subject: EXT :Re: [EXTERNAL] Re: How to restart/recover on reboot?

Hi Tim,John,

I do agree with the issue John mentioned and have the same problem.

We can only start a standalone HA cluster with ./start-cluster.sh script. And 
then when there are failures, we can restart those components individually by 
calling jobmanager.sh/ jobmanager.sh.  This works great

But , Like John mentioned, If we want to start the cluster initially itself by 
running the jobmanager.sh on each JobManager nodes, it is not working. It binds 
to local and not forming the HA cluster.

Thanks,
Shakir

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Tuesday, June 18, 2019 at 4:23 PM
To: John Smith mailto:java.dev@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: [EXTERNAL] Re: How to restart/recover on reboot?

I guess it should work if you installed a systemd service which simply calls 
`jobmanager.sh start` or `taskmanager.sh start`.

Cheers,
Till

On Tue, Jun 18, 2019 at 4:29 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Yes, that is understood. But I don't see why we cannot call jobmanager.sh and 
taskmanager.sh to build the cluster and have them run as systemd units.

I looked at start-cluster.sh and all it does is SSH and call jobmanager.sh 
which then cascades to taskmanager.sh I just have to pin point what's missing 
to have systemd service working. In fact calling jobmanager.sh as systemd 
service actually sees the shared masters, slaves and flink-conf.yaml. But it 
binds to local host.

Maybe one way to do it would be to bootstrap the cluster with 
./start-cluster.sh and then install systemd services for jobmanager.sh and 
tsakmanager.sh

Like I said I don't want to have some process in place to remind admins they 
need to manually start a node every time they patch or a host goes down for 
what ever reason.

On Tue, 18 Jun 2019 at 04:31, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
When a single machine fails you should rather call `taskmanager.sh 
start`/`jobmanager.sh start` to start a single process. `start-cluster.sh` will 
start multiple processes on different machines.

Cheers,
Till

On Mon, Jun 17, 2019 at 4:30 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
Well some reasons, machine reboots/maintenance etc... Host/VM crashes and 
restarts. And same goes for the job manager. I don't want/need to have to 
document/remember some start process for sys admins/devops.

So far I have looked at ./start-cluster.sh and all it seems to do is SSH into 
all the specified nodes and starts the processes using the jobmanager and 
taskmanager scripts. I don't see anything special in any of the sh scripts.
I configured passwordless ssh through terraform and all that works great only 
when trying to do the manual start through systemd. I may have something 
missing...

On Mon, 17 Jun 2019 at 09:41, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi John,

I have not much experience wrt setting Flink up via systemd services. Why do 
you want to do it like that?

1. In standalone mode, Flink won't automatically restart TaskManagers. This 
only works on Yarn and Mesos atm.
2. In case of a lost TaskManager, you should run `taskmanager.sh start`. This 
script simply starts a new TaskManager process.
3. I guess you could use systemd to bring up a Flink TaskManager process on 
start up.

Cheers,
Till

On Fri, Jun 14, 2019 at 5:56 PM John Smith 
mailto:java.dev@gmail.com>> wrote:
I looked into the start-cluster.sh and I don't see anything special. So 
technically it should be as easy as installing Systemd services to run 
jobamanger.sh and taskmanager.sh respectively?

On Wed, 12 Jun 2019 at 13:02, John Smith 
mailto:java.dev@gmail.com>> wrote:
The installation instructions do not indicate how to create systemd services.

1- When task nodes fail, will the job leader detect this and ssh and restart 
the task node? From my testing it doesn't seem like it.
2- How do we recover a lost node? Do we simply go back to the master node and 
run start-cluster.sh and the script is smart enough to figure out what is 
missing?
3- Or do we need to create systemd services and if so on which command do we 
start the service on?


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is 

RE: EXT :How to config user for passwordless ssh?

2019-06-11 Thread Martin, Nick
Env.ssh.opts is the literal argument string to ssh as you would enter it on the 
command line. Take a look at TMSlaves() in config.sh to see exactly how it’s 
being used.

From: John Smith [mailto:java.dev@gmail.com]
Sent: Tuesday, June 11, 2019 12:30 PM
To: user 
Subject: EXT :How to config user for passwordless ssh?

Hi, is it possible to change the default user from root to something else?

When we run ./start-cluster.sh it tries to ssh using root user.

I see in the docs: env.ssh.opts But it doesn't say how to configure the 
options. If that's even the case?


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: EXT :read only mode for Flink UI

2019-04-25 Thread Martin, Nick
AFAIK, there are no granular permissions like that built into Flink. Limiting 
access to the REST API seems like a good place to start. The web UI uses the 
API, but controlling it there means you’re locking down all means of access. 
The designers of the API were disciplined about what HTTP verbs were used, so 
allowing all GET requests and denying PUT/POST/DELETE/PATCH would mean read 
only access, and I think that would be straightforward to implement with an 
HTTP proxy

From: uday bhaskar [mailto:uday...@gmail.com]
Sent: Thursday, April 25, 2019 6:57 AM
To: user@flink.apache.org
Subject: EXT :read only mode for Flink UI

Hi

We are looking at running Flink on Kubernetes in Job cluster mode. As part of 
our plans we do not want to allow modifications to the job cluster once a job 
is running. For this we are looking at a "read-only" Flink UI, that does not 
allow users to cancel a job or submit a job.

My question is,
1. Is there such an option when we bring up a Flink cluster currently
2. If no, is this something we can contribute?

I can imagine another solution where the "cancel" and "submit job" options 
mutates the job clusters.

Wanted to check what are the general guidelines on this.

Any pointers would be appreciated

Uday


Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: EXT :Re: Flink 1.7.1 Inaccessible

2019-03-04 Thread Martin, Nick
Seye, are you running Flink and Zookeeper in Docker? I’ve had problems with 
Jobmanagers not resolving the hostnames for Zookeeper when starting a stack on 
Docker.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Monday, March 04, 2019 7:02 AM
To: Seye Jin 
Cc: user 
Subject: EXT :Re: Flink 1.7.1 Inaccessible

Hi Seye,

usually, Flink's web UI should be accessible after a successful leader 
election. Could you share with us the cluster logs to see what's going on? 
Without this information it is hard to tell what's going wrong.

What you could also do is to check the ZooKeeper znode which represents the 
cluster id (if you are using Yarn it should be something like 
/flink/application_...). There you could check the contents of the leader znode 
of the web ui (leader/rest_server_lock). It should contain the address of the 
current leader if there is one.

Cheers,
Till

On Sat, Mar 2, 2019 at 5:29 AM Seye Jin 
mailto:seyej...@gmail.com>> wrote:
I am getting "service temporarily unavailable due to an ongoing leader 
election" when I try to access Flink UI. The jobmanager has HA configured, I 
have tried to restart jobmanager multiple times but no luck. I also tried 
submitting my job from console but I also get the same message.
When I view logs during JM restart I see no errors, it even says "jobmanager 
was granted leadership with ..."
Any hints to try and remediate this issue will be much appreciated. I have 
multiple stateful applications running so I cannot start a new cluster(since I 
am unable to do a savepoint also).
Thanks



Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Martin, Nick
Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, 
so the window in your window function never closes.

There’s a ticket open to fix it 
(https://issues.apache.org/jira/browse/FLINK-5479) for the kafka connector, but 
in general any time one parallel instance of a source function isn’t getting 
data you have to watch out for this.

From: Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
Sent: Tuesday, February 19, 2019 6:32 AM
To: user 
Subject: EXT :Re: How to debug difference between Kinesis and Kafka

Hmmm my suspicions are now quite high. I created a file source that just 
replays the events straight then I get more results

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hmmm after expanding the dataset such that there was additional data that ended 
up on shard-0 (everything in my original dataset was coincidentally landing on 
shard-1) I am now getting output... should I expect this kind of behaviour if 
no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:
Hi, I’m having a strange situation and I would like to know where I should 
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three sources 
pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the 
aggregation function I can see that the data is getting aggregated…, I’m using 
the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can 
retrieve the key

Here’s the strange thing, I only change the source (and each source uses the 
same deserialization function) but:


  *   When I use either Kafka or my Mock source, the WindowFunction gets called 
as events pass the end of the window
  *   When I use the Kinesis source, however, the window function never gets 
called. I have even tried injecting events into kinesis with really high 
timestamps to flush the watermarks in my 
BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different 
behaviour:

Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region", sourceProperties, 
AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint", sourceProperties, 
AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key", 
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key", 
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile", sourceProperties, 
AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server", 
sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties, 
"group.id");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName + 
'\'');
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// poll watermark every second because using 
BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);

SplitStream eventsByType = env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory erases 
the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new 
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
eventsByType.select(...)
.keyBy(new JsonNodeStringKeySelector("_key"))

.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),

RE: EXT :Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-16 Thread Martin, Nick
Does that mean that the full set of fs.s3a.<…> configs in core-site.xml will be 
picked up from flink-conf.yaml by flink? Or only certain configs involved with 
authentication?

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Wednesday, January 16, 2019 3:43 AM
To: Vinay Patil 
Cc: Kostas Kloudas ; Dawid Wysakowicz 
; Taher Koitawala [via Apache Flink User Mailing List 
archive.] ; user 
Subject: EXT :Re: StreamingFileSink cannot get AWS S3 credentials

I haven't configured this myself but I would guess that you need to set the 
parameters defined here under S3A Authentication methods [1]. If the 
environment variables don't work, then I would try to set the authentication 
properties.

[1] 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A

Cheers,
Till

On Wed, Jan 16, 2019 at 11:09 AM Vinay Patil 
mailto:vinay18.pa...@gmail.com>> wrote:
Hi Till,

Can you please let us know the configurations that we need to set for Profile 
based credential provider in flink-conf.yaml

Exporting AWS_PROFILE property on EMR did not work.

Regards,
Vinay Patil


On Wed, Jan 16, 2019 at 3:05 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
The old BucketingSink was using Hadoop's S3 filesystem directly whereas the new 
StreamingFileSink uses Flink's own FileSystem which need to be configured via 
the flink-conf.yaml.

Cheers,
Till

On Wed, Jan 16, 2019 at 10:31 AM Vinay Patil 
mailto:vinay18.pa...@gmail.com>> wrote:
Hi Till,

We are not providing `fs.s3a.access.key: access_key`, `fs.s3a.secret.key: 
secret_key` in flink-conf.yaml as we are using Profile based credentials 
provider. The older BucketingSink code is able to get the credentials and write 
to S3. We are facing this issue only with StreamingFileSink. We tried adding 
fs.s3a.impl to core-site.xml when the default configurations were not working.

Regards,
Vinay Patil


On Wed, Jan 16, 2019 at 2:55 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Vinay,

Flink's file systems are self contained and won't respect the core-site.xml if 
I'm not mistaken. Instead you have to set the credentials in the flink 
configuration flink-conf.yaml via `fs.s3a.access.key: access_key`, 
`fs.s3a.secret.key: secret_key` and so on [1]. Have you tried this out?

This has been fixed with Flink 1.6.2 and 1.7.0 [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html#built-in-file-systems
[2] https://issues.apache.org/jira/browse/FLINK-10383

Cheers,
Till

On Wed, Jan 16, 2019 at 10:10 AM Kostas Kloudas 
mailto:k.klou...@da-platform.com>> wrote:
Hi Taher,

So you are using the same configuration files and everything and the only thing 
you change is the "s3://" to "s3a://" and the sink cannot find the credentials?
Could you please provide the logs of the Task Managers?

Cheers,
Kostas

On Wed, Jan 16, 2019 at 9:13 AM Dawid Wysakowicz 
mailto:dwysakow...@apache.org>> wrote:

Forgot to cc ;)
On 16/01/2019 08:51, Vinay Patil wrote:
Hi,

Can someone please help on this issue. We have even tried to set fs.s3a.impl in 
core-site.xml, still its not working.

Regards,
Vinay Patil


On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User Mailing 
List archive.] 
mailto:ml%2bs2336050n25464...@n4.nabble.com>>
 wrote:
Hi All,
 We have implemented S3 sink in the following way:

StreamingFileSink sink= StreamingFileSink.forBulkFormat(new 
Path("s3a://mybucket/myfolder/output/"), 
ParquetAvroWriters.forGenericRecord(schema))
.withBucketCheckInterval(50l).withBucketAssigner(new 
CustomBucketAssigner()).build();

The problem we are facing is that StreamingFileSink is initializing 
S3AFileSystem class to write to s3 and is not able to find the s3 credentials 
to write data, However other flink application on the same cluster use "s3://" 
paths are able to write data to the same s3 bucket and folders, we are only 
facing this issue with StreamingFileSink.

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
To start a new topic under Apache Flink User Mailing List archive., email 
ml+s2336050n1...@n4.nabble.com
To unsubscribe from Apache Flink User Mailing List archive., click 
here.

S3 StreamingFileSink never completes multipart uploads

2019-01-02 Thread Martin, Nick
I'm running on Flink 1.7.0 trying to use the StreamingFileSink with an S3A URI. 
What I'm seeing is that whenever the RollingPolicy determines that it's time to 
roll to a new part file, the whole Sink just hangs, and the in progress 
MultiPart Upload never gets completed. I've looked at the traffic between Flink 
and the S3 endpoint, and I don't ever see the POST message that should close 
off a completed upload. Has anyone else run into something like that?



Nick Martin



--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: EXT :Re: Custom S3 endpoint

2018-12-20 Thread Martin, Nick
Yeah, I figured that part out. I’ve tried to make it work with 2.7 and 2.8, and 
it looks like the prebuilt jars have actually moved to Hadoop 3

From: Paul Lam [mailto:paullin3...@gmail.com]
Sent: Tuesday, December 18, 2018 7:08 PM
To: Martin, Nick 
Cc: user@flink.apache.org
Subject: EXT :Re: Custom S3 endpoint

Hi Nick,

What version of Hadoop are you using? AFAIK, you must use Hadoop 2.7+ to 
support custom s3 endpoint, or the `fs.s3a.endpoint` property in core-site.xml 
would be ignored.

Best,
Paul Lam


在 2018年12月19日,06:40,Martin, Nick 
mailto:nick.mar...@ngc.com>> 写道:

I’m working on Flink 1.7.0 and I’m trying to use the built in S3 libraries like 
readFile(‘s3://bucket/object’) or 
StreamingFileSink. My storage provider is not AWS, but they implement the same 
API. So I need to point the S3 client to a different address. The Hadoop 
documentation shows that there are options in core-site to set that up. The 
problem is, I can’t seem to get the right dependencies together to use the S3 
filesystem. As far as I can tell, the pre-built Hadoop/Presto jars don’t use 
core-site.xml, and the instructions for manual setup given here 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup)
 list a set of dependencies that seems to be completely wrong.

How can use the S3 sources/sinks with a custom http endpoint?



Nick Martin



Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*



Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communi

Custom S3 endpoint

2018-12-18 Thread Martin, Nick
I'm working on Flink 1.7.0 and I'm trying to use the built in S3 libraries like 
readFile('s3://bucket/object') or StreamingFileSink. My storage provider is not 
AWS, but they implement the same API. So I need to point the S3 client to a 
different address. The Hadoop documentation shows that there are options in 
core-site to set that up. The problem is, I can't seem to get the right 
dependencies together to use the S3 filesystem. As far as I can tell, the 
pre-built Hadoop/Presto jars don't use core-site.xml, and the instructions for 
manual setup given here 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup)
 list a set of dependencies that seems to be completely wrong.

How can use the S3 sources/sinks with a custom http endpoint?



Nick Martin



--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: Parallelism and keyed streams

2018-07-16 Thread Martin, Nick
Is value(index-1) stored in Keyed State, or just a local variable inside the 
operator?

-Original Message-
From: Nicholas Walton [mailto:nwal...@me.com] 
Sent: Monday, July 16, 2018 1:33 PM
To: user@flink.apache.org
Subject: Parallelism and keyed streams

I have a stream of tuples  , which I 
form into a keyedStream using keyBy on channel. I then need to process each 
channel in parallel. Each parallel stream must be processed in strict 
sequential order by index to calculate the ratios value(index)/value(index-1). 
If I set parallelism to 1 all is well, each channel is processed in order of 
index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s 
keyedStream  appears to be split across multiple processes. So a channel may be 
processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams 
with an unknown parallelism so that each stream processes each channel by 
strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


RE: Regarding the use of RichAsyncFunction as a FlatMap

2018-07-12 Thread Martin, Nick
Regarding the ‘partiality’ you mention in option one, wouldn’t you have to give 
that up anyway to maintain exactly once processing? Suppose input message A 
results in asynchronous queries/futures B and C, and imagine the following 
series of events:

1.   Your function receives A

2.   Asynchronous calls start for B and C

3.   B completes, and its result is immediately emitted.

4.   Your job is restarted before C returns

How would Flink restart the job without either dropping C or duplicating B?  I 
think the only answer is for the function to hold on to results generated by A 
until they have all completed, then emit them all at once, and I have a 
suspicion that under the hood, that’s what a flat map is doing.

From: Konstantinos Barmpis [mailto:konstantinos.barm...@york.ac.uk]
Sent: Thursday, July 12, 2018 8:42 AM
To: user@flink.apache.org
Subject: Regarding the use of RichAsyncFunction as a FlatMap

I was wondering if there is a way to create an asynchronous flatmap function in 
Flink.

As far as I am aware, the asynchronous function only accepts a single result 
future as its return (which can be the aggregate list of the flatmap, but then 
partiality is lost as we have to wait for all of the results to be aggregated 
before returning).

Alternatively, using a normal FlatMap seems to require a way to keep the 
current instance alive as it waits for the results and emits them one at a 
time, as otherwise Flink may consider it terminated (as the function call 
returns immediately) and close the channel.

Is there some third option I am unaware of (or using one of the above two in a 
different way?),

Cheers.




--
Konstantinos Barmpis | Research Associate
White Rose Grid Enterprise Systems Group
Dept. of Computer Science
University of York
Tel: +44 (0) 1904-32 5653

Email Disclaimer:
http://www.york.ac.uk/docs/disclaimer/email.htm


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


String Interning

2018-06-22 Thread Martin, Nick
I have a job where I read data from Kafka, do some processing on it, and write 
it to a database. When I read data out of Kafka, I put it into an object that 
has a String field based on the Kafka message key. The possible values for the 
message key are tightly constrained so there are fewer than 100 possible unique 
key values. Profiling of the Flink job shows millions of in flight stream 
elements, with an equal number of Strings, but I know all the strings are 
duplicates of a small number of unique values.  So it's an ideal usecase for 
String interning. I've tried to use interning in the constructors for the 
message elements, but I suspect that I need to do something to preserve the 
interning when Flink serializes/deserializes objects when passing them between 
operators. What's the best way to accomplish that?





--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


Custom Partitioning for Keyed Streams

2018-01-09 Thread Martin, Nick
Have a set of stateful operators that rely on keyed state. There is substantial 
skew between keys (i.e. there will be 100 messages on keys A and B, and 10 
messages each on keys C-J), and key selection assignment is dictated by the 
needs of my application such that I can't choose keys in a way that will 
eliminate the skew. The skew is somewhat predictable (i.e. I know keys A and B 
will usually get roughly 10x as many messages as the rest) and fairly 
consistent on different timescales (i.e. counting the messages on each key for 
30 seconds would provide a reasonably good guess as to the distribution of 
messages that will be received over the next 10-20 minutes).

The problem I'm having is that often the high volume keys (A and B in the 
example) end up on the same task slot and slow it down, while the low volume 
ones are distributed across the other operators, leaving them underloaded. I 
looked into the available physical partitioning functions, but it looks like 
that functionality is generally incompatible with keyed streams, and I need 
access to keyed state to do my actual processing. Is there any way I can get 
better load balancing while using keyed state?


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*