Glad to see that!

However, I was told that it is not the right approach to directly
extend `AbstractUdfStreamOperator` in DataStream API. This would be
fixed at some point (maybe Flink 2.0). The JIRA link is [1].

[1] https://issues.apache.org/jira/browse/FLINK-17862

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> thanks. it worked!
>
> I add the following method at the
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> class:
>
> public Environment getTaskEnvironment() { return this.taskEnvironment; }
>
> Then I am getting the IP using:
>
> ConfigOption<String> restAddressOption = ConfigOptions
>    .key("rest.address")
>    .stringType()
>    .noDefaultValue();
> String restAddress =
> this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);
>
> Thanks!
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 22, 2020 at 3:54 AM Yangze Guo <karma...@gmail.com> wrote:
> >
> > Hi, Felipe
> >
> > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> > could indeed get all the configurations(including what you defined in
> > flink-conf.yaml) through
> > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> > However, I guess it is not the right behavior and might be fixed in
> > future versions.
> >
> > Best,
> > Yangze Guo
> >
> >
> >
> > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> > <felipe.o.gutier...@gmail.com> wrote:
> > >
> > > Hi all,
> > >
> > > I would like to have the IP of the JobManager, not the Task Executors.
> > > I explain why.
> > >
> > > I have an operator (my own operator that extends
> > > AbstractUdfStreamOperator) that sends and receives messages from a
> > > global controller. So, regardless of which TaskManager these operator
> > > instances are deployed, they need to send and receive messages from my
> > > controller. Currently, I am doing this using MQTT broker (this is my
> > > first approach and I don't know if there is a better way to do it,
> > > maybe there is...)
> > >
> > > The first thing that I do is to start my controller using the
> > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > > it to the JobManager host. I am getting the IP of the JobManager by
> > > adding this method on the
> > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > > class:
> > >        public String getRpcServiceAddress() {
> > > return this.rpcService.getAddress();
> > > }
> > > That is working. Although I am not sure if it is the best approach.
> > >
> > > The second thing that I am doing is to make each operator instance
> > > publish and subscribe to this controller. To do this they need the
> > > JobManager IP. I could get the TaskManager IPs from the
> > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > > the JobManager IP as a parameter to the operator at the moment. I
> > > suppose that it is easy to get the JobManager IP inside the
> > > AbstractUdfStreamOperator or simply add some method somewhere to get
> > > this value. However, I don't know where.
> > >
> > > Thanks,
> > > Felipe
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <karma...@gmail.com> wrote:
> > > >
> > > > Hi, Felipe
> > > >
> > > > Do you mean to get the Host and Port of the task executor where your
> > > > operator is indeed running on?
> > > >
> > > > If that is the case, IIUC, two possible components that contain this
> > > > information are RuntimeContext and the Configuration param of
> > > > RichFunction#open. After reading the relevant code path, it seems you
> > > > could not get it at the moment.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > > <alexan...@ververica.com> wrote:
> > > > >
> > > > > Hi Felippe,
> > > > >
> > > > > could you clarify in some more details what you are trying to achieve?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > --
> > > > >
> > > > > Alexander Fedulov | Solutions Architect
> > > > >
> > > > > +49 1514 6265796
> > > > >
> > > > >
> > > > >
> > > > > Follow us @VervericaData
> > > > >
> > > > > --
> > > > >
> > > > > Join Flink Forward - The Apache Flink Conference
> > > > >
> > > > > Stream Processing | Event Driven | Real Time
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH
> > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, 
> > > > > Ji (Tony) Cheng
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> > > > > <felipe.o.gutier...@gmail.com> wrote:
> > > > >>
> > > > >> Hi all,
> > > > >>
> > > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > > >> class and I want to issue some messages to it. Sometimes the operator
> > > > >> instances are deployed on different TaskManagers and I would like to
> > > > >> set some attributes like the master and slave IPs on it.
> > > > >>
> > > > >> I am trying to use these values but they only return localhost, not
> > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > > >> 192.168.56.1).
> > > > >>
> > > > >> ConfigOption<String> restAddressOption = ConfigOptions
> > > > >>    .key("rest.address")
> > > > >>    .stringType()
> > > > >>    .noDefaultValue();
> > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > > > >> System.out.println("rpcService: " + rpcService.getAddress());
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >> Felipe
> > > > >>
> > > > >> --
> > > > >> -- Felipe Gutierrez
> > > > >> -- skype: felipe.o.gutierrez
> > > > >> -- https://felipeogutierrez.blogspot.com

Reply via email to