unsubscribe

2019-12-08 Thread Deepak Sharma



Change Flink binding address in local mode

2019-12-08 Thread Andrea Cardaci
Hi,

Flink (or some of its services) listens on three random TCP ports
during the local[1] execution, e.g., 39951, 41009 and 42849.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html#local-environment

The sockets listens on `0.0.0.0` and since I need to run some
long-running tests on an Internet-facing machine I was wondering how
to make them listen on `localhost` instead or if there is anything
else I can do to improve the security in this scenario.

Here's what I tried (with little luck):

> Configuration config = new Configuration();
> config.setString("taskmanager.host", "127.0.0.1");
> config.setString("rest.bind-address", "127.0.0.1"); // OK
> config.setString("jobmanager.rpc.address", "127.0.0.1");
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.getDefaultLocalParallelism(),
>  config);

Only the `rest.bind-address` configuration actually changes the
binding address of one of those ports. Are there other parameters that
I'm not aware of or this is not the right approach in local mode?


Best,
Andrea


Re: Emit intermediate accumulator state of a session window

2019-12-08 Thread Rafi Aroch
Hi Chandu,

Maybe you can use a custom trigger:
* .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin  wrote:

> Hi Chandu,
>
> I am not sure whether using the windowing API is helpful in this case at
> all.
>
> At least, you could try to consume the data not only by windowing but also
> by a custom stateful function.
> You look into the AggregatingState [1]. Then you could do whatever you
> want with the current aggregated value.
> If you still need to do something with the result of windowing, you could
> do it as now or simulate it with timers [2] in that same stateful function.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>
> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>
>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>> session window when new event arrives*
>>
>>
>>
>> AggregateFunction#getResults() is called only when window completes. My
>> need is emit intermediate accumulator values(result of
>> AggregateFunction#add()) as well and write them to Sink. Both
>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>> aggregated result, only when the window is closed.
>>
>> *Any thoughts please, how to emit or stream intermediate accumulator
>> state as soon as new event arrive when window is open? Need to implement
>> custom trigger or Assigner?*
>>
>>
>>
>> To give you some background, when user watches a video we get events -
>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>> video.
>>
>> I need to aggregate them as soon as they arrive and post it to
>> destination. For example, if user watching a two-hour movie I get events
>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>> 25%,...,100%). The below implementation emitting(getResult()) a single
>> event 20 minutes after watching a video.
>>
>>
>>
>>
>>
>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>
>>
>> .aggregate(new EventAggregator())
>>
>>
>> .filter(new FinalFilter())
>>
>>
>> .addSink(...)
>>
>>
>> Appreciate your help.
>>
>>
>> Thanks,
>>
>> chandu
>>
>


[ANNOUNCE] Weekly Community Update 2019/49

2019-12-08 Thread Konstantin Knauf
Dear community,

happy to share this week's community digest with an update on Flink 1.8.3,
a revival of the n-ary stream operator, a proposal to move our build
infrastructure to Azure pipelines, and quite a few other topics. Enjoy.

Flink Development
==

* [releases] The feature freeze for *Flink 1.10* is tonight.

* [releases] Hequn has published and started a vote on RC3 for *Flink 1.8.3
*Voting is open until Dec. 10th 2019, 16:00 UTC. No votes so far, but I
assume this will change after the feature freeze. [1]

* [runtime] Piotr has restarted the discussion to add an *n-ary stream
operator* which would help to support multi-broadcast joins in Flink SQL
(think of a star schema). The topic has been discussed before (in 2016) in
the context of side-inputs and there is already an old design document
drafted by Aljoscha to build on top. [2]

* [connectors] Chesnay started a conversation to drop support for *Kafka
0.8/0.9* connectors in the upcoming release. It seems that quite a few
people are in favor of dropping, but Becket also made a valid point to only
deprecate these connector instead of removing them all togehter. [3]

* [connectors] Becket has started the vote on *FLIP-27, the new source
interface.* This has been a long ongoing topic, but it has not been
officially been voted on so far. So there we go. [4,5]

* [state backends] Stephan has proposed to drop support for the *synchronous
mode of the heap statebackend.* One supporting comment so far. [6]

* [hadoop] Craig Foster brought up the topic of *Hadoop 3* support in
Apache Flink*.* There is currently no one working on this topic, but Marton
Balassi of Cloudera reported that they have been working on this internally
and would be willing to contribute their work back next year. [7]

* [development process] There is currently no *end-to-end test for Flink
deployments on Mesos*. Yangze suggests to such tests now and has started a
discussion thread on the topic. He is now looking for feedback from Mesos
production users in order to improve the planned test suite. It seems the
community will need to maintain their own Mesos docker images for these
tests due to Java compatibility issues with the official Mesos docker
images. [8]

* [development process] Dawid has asked comitters to only vote with
their *apache.org
 email* addresses and to only count these votes as
binding going forward. Only for apache.org addresses it is possible to
verify the status of the voter. [9]

* [development process] Following the discussion on reducing the build time
of Apache Flink, Robert and others did some experiments to migrate our
build from *Travis CI to Azure Pipelines. *In this thread [10] and wiki
page [11] he presents his results and asks for opinions on how to move
forward. So far there has been a lot of positive feedback to migrate to
Azure pipelines mainly motivated by lower build times (due to additional
sponsored machines) and richer features.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-3-release-candidate-3-tp35628.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-tp11341p35554.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Kafka-0-8-0-9-tp35553.html
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-27-Refactor-Source-Interface-tp35569.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Heap-Backend-Synchronous-snapshots-tp35621.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Re-Building-with-Hadoop-3-tp35522p35528.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-e2e-tests-for-Flink-s-Mesos-integration-tp35660p35687.html
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Voting-from-apache-org-addresses-tp35499.html
[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Migrate-build-infrastructure-from-Travis-CI-to-Azure-Pipelines-tp35538.html
[11]
https://cwiki.apache.org/confluence/display/FLINK/%5Bpreview%5D+Azure+Pipelines

Notable Bugs
==

* [FLINK-15063] [1.9.1] The scope of the input/output metrics of the
network stack are interchanged, e.g. the outPoolUsage metric can be found
under task level scope of "shuffle.netty.input" instead of
"shuffle.netty.output". Fixed for 1.9.2. [12]

* [FLINK-14949] [1.9.1] [1.8.2] A job can get stuck during cancellation,
e.g. if Flink can not spawn the threads, which perform exactly this
cancellation. Fixed for 1.9.2 [13]

[12] https://issues.apache.org/jira/browse/FLINK-15063
[13] https://issues.apache.org/jira/browse/FLINK-14949

Events, Blog Posts, Misc
===

* *Markos* and *Yuan* have published a recap of Flink Forward Asia 2019 on
the Ververica blog including a short summary

Re: KeyBy/Rebalance overhead?

2019-12-08 Thread Komal Mariam
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records with 11
> Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
> 1 job manager with 6 cores. (10 task slots per TM hence I set environment
> parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to 52
> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is there
> any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
> new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>


Re: User program failures cause JobManager to be shutdown

2019-12-08 Thread Dongwon Kim
Hi Robert and Roman,
Yeah, letting users know System.exit() is called would be much more
appropriate than just intercepting and ignoring.

Best,
Dongwon

On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger  wrote:

> I guess we could manage the security only when calling the user's main()
> method.
>
> This problem actually exists for all usercode in Flink: You can also kill
> TaskManagers like this.
> If we are going to add something like this to Flink, I would only log that
> System.exit() has been called by the user code, not intercept and ignore
> the call.
>
> On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> This should work but it could also interfere with Flink itself exiting in
>> case of a fatal error.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim  wrote:
>>
>>> FYI, we've launched a session cluster where multiple jobs are managed by
>>> a job manager. If that happens, all the other jobs also fail because the
>>> job manager is shut down and all the task managers get into chaos (failing
>>> to connect to the job manager).
>>>
>>> I just searched a way to prevent System.exit() calls from terminating
>>> JVMs and found [1]. Can it be a possible solution to the problem?
>>>
>>> [1]
>>> https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm
>>>
>>> Best,
>>> - Dongwon
>>>
>>> On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim 
>>> wrote:
>>>
 Hi Robert and Roman,

 Thank you for taking a look at this.

 what is your main() method / client doing when it's receiving wrong
> program parameters? Does it call System.exit(), or something like that?
>

 I just found that our HTTP client is programmed to call System.exit(1).
 I should guide not to call System.exit() in Flink applications.

 p.s. Just out of curiosity, is there no way for the web app to
 intercept System.exit() and prevent the job manager from being shutting
 down?

 Best,

 - Dongwon

 On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger 
 wrote:

> Hi Dongwon,
>
> what is your main() method / client doing when it's receiving wrong
> program parameters? Does it call System.exit(), or something like that?
>
> By the way, the http address from the error message is
> publicly available. Not sure if this is internal data or not.
>
> On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> I wasn't able to reproduce your problem with Flink JobManager 1.9.1
>> with various kinds of errors in the job.
>> I suggest you try it on a fresh Flink installation without any other
>> jobs submitted.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Roman,
>>>
>>> We're using the latest version 1.9.1 and those two lines are all
>>> I've seen after executing the job on the web ui.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Dongwon,

 Could you please provide Flink version you are running and the job
 manager
 logs?

 Regards,
 Roman


 eastcirclek wrote
 > Hi,
 >
 > I tried to run a program by uploading a jar on Flink UI. When I
 > intentionally enter a wrong parameter to my program, JobManager
 dies.
 > Below
 > is all log messages I can get from JobManager; JobManager dies as
 soon as
 > spitting the second line:
 >
 > 2019-12-05 04:47:58,623 WARN
 >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
 >> Configuring the job submission via query parameters is
 deprecated. Please
 >> migrate to submitting a JSON request instead.
 >>
 >>
 >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
 >>   - Cannot
 >> connect:
 http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
 >> <
 http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models>
 ;:
 >> com.fasterxml.jackson.databind.exc.MismatchedInputException:
 Cannot
 >> deserialize instance of `java.util.ArrayList` out of
 START_OBJECT token
 >> at
 >> [Source:
 >>
 (String)“{”code”:“GB0001”,“resource”:“msg.comm.unknown.error”,“details”:“NullPointerException:
 >> “}”; line: 1, column: 1]2019-12-05 04:47:59,166 INFO
 >>  org.apache.flink.runtime.blob.BlobServer  -
 Stopped
 >> BLOB server at 0.0.0.0:6124 <

Sample Code for querying Flink's default metrics

2019-12-08 Thread Pankaj Chand
Hello,

Using Flink on Yarn, I could not understand the documentation for how to
read the default metrics via code. In particular, I want to read
throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
Memory.

Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

Thank you,

Pankaj


Re: KeyBy/Rebalance overhead?

2019-12-08 Thread vino yang
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical
partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network
shuffles) costs which may cause more time cost. The example provided by you
may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

Komal Mariam  于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations as
>> according to my understanding they partition our tasks over the defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records with
>> 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is there
>> any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --
>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>> new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> public class findDistancefromPOI extends RichFilterFunction {
>> public boolean filter(Point input) throws Exception {
>> Double distance = computeEuclideanDist(
>> 16.4199  , 89.974  ,input.X(),input.Y);
>>  return distance > 0;
>> }
>> }
>>
>> Best Regards,
>> Komal
>>
>


Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-08 Thread Yang Wang
Thanks Yangze for starting this discussion.

Just share my thoughts.

If the mesos official docker image could not meet our requirement, i
suggest to build the image locally.
We have done the same things for yarn e2e tests. This way is more flexible
and easy to maintain. However,
i have no idea how long building the mesos image locally will take. Based
on previous experience of yarn, i
think it may not take too much time.



Best,
Yang

Yangze Guo  于2019年12月7日周六 下午4:25写道:

> Thanks for your feedback!
>
> @Till
> Regarding the time overhead, I think it mainly come from the network
> transmission. For building the image locally, it will totally download
> 260MB files including the base image and packages. For pulling from
> DockerHub, the compressed size of the image is 347MB. Thus, I agree
> that it is ok to build the image locally.
>
> @Piyush
> Thank you for offering the help and sharing your usage scenario. In
> current stage, I think it will be really helpful if you can compress
> the custom image[1] or reduce the time overhead to build it locally.
> Any ideas for improving test coverage will also be appreciated.
>
> [1]
> https://hub.docker.com/layers/karmagyz/mesos-flink/latest/images/sha256-4e1caefea107818aa11374d6ac8a6e889922c81806f5cd791ead141f18ec7e64
>
> Best,
> Yangze Guo
>
> On Sat, Dec 7, 2019 at 3:17 AM Piyush Narang  wrote:
> >
> > +1 from our end as well. At Criteo, we are running some Flink jobs on
> Mesos in production to compute short term features for machine learning.
> We’d love to help out and contribute on this initiative.
> >
> > Thanks,
> > -- Piyush
> >
> >
> > From: Till Rohrmann 
> > Date: Friday, December 6, 2019 at 8:10 AM
> > To: dev 
> > Cc: user 
> > Subject: Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration
> >
> > Big +1 for adding a fully working e2e test for Flink's Mesos
> integration. Ideally we would have it ready for the 1.10 release. The lack
> of such a test has bitten us already multiple times.
> >
> > In general I would prefer to use the official image if possible since it
> frees us from maintaining our own custom image. Since Java 9 is no longer
> officially supported as we opted for supporting Java 11 (LTS) it might not
> be feasible, though. How much longer would building the custom image vs.
> downloading the custom image from DockerHub be? Maybe it is ok to build the
> image locally. Then we would not have to maintain the image.
> >
> > Cheers,
> > Till
> >
> > On Fri, Dec 6, 2019 at 11:05 AM Yangze Guo  karma...@gmail.com>> wrote:
> > Hi, all,
> >
> > Currently, there is no end to end test or IT case for Mesos deployment
> > while the common deployment related developing would inevitably touch
> > the logic of this component. Thus, some work needs to be done to
> > guarantee experience for both Meos users and contributors. After
> > offline discussion with Till and Xintong, we have some basic ideas and
> > would like to start a discussion thread on adding end to end tests for
> > Flink's Mesos integration.
> >
> > As a first step, we would like to keep the scope of this contribution
> > to be relative small. This may also help us to quickly get some basic
> > test cases that might be helpful for the upcoming 1.10 release.
> >
> > As far as we can think of, what needs to be done is to setup a Mesos
> > framework during the testing and determine which tests need to be
> > included.
> >
> >
> > ** Regarding the Mesos framework, after trying out several approaches,
> > I find that setting up Mesos in docker is probably what we want. The
> > resources needed for building and setting up Mesos from source is
> > probably not affordable in most of the scenarios. So, the one open
> > question that worth discussion is the choice of Docker image. We have
> > come up with two options.
> >
> > - Using official Mesos image[1]
> > The official image was the first alternative that come to our mind,
> > but we run into some sort of Java version compatibility problem that
> > leads to failures of launching task executors. Flink supports Java 9
> > since version 1.9.0 [2], However, the official Docker image of Mesos
> > is built with a development version of JDK 9, which probably has
> > caused this problem. Unless we want to make Flink to also be
> > compatible with the JDK development version used by the official mesos
> > image, this option does not work out. Besides, according to the
> > official roadmap[5], Java 9 is not a long-term support version, which
> > may bring stability risk in future.
> >
> > - Build a custom image
> > I've already tried build a custom image[3] and successfully run most
> > of the existing end to end tests cases with it. The image is built
> > with Ubuntu 16.04, JDK 8 and Mesos 1.7.1. For the mesos e2e test
> > framework, we could either build the image from a Docker file or pull
> > the pre-built image from DockerHub (or other hub services) during the
> > testing.
> > If we decide to publish the an image on DockerHub, we probably

Re: Flink 'Job Cluster' mode Ui Access

2019-12-08 Thread Jatin Banger
Hi,

I have checked the logs with this keyword  *StaticFileServerHandler   *in
it, But there were no logs coming for "Flink Job Cluster".
Then i checked for Flink Session Cluster, i was able to find the logs for
the *StaticFileServerHandler *keyword.

Can i raise this as bug ?

Best Regards,
Jatin


On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler  wrote:

> Ok, it's good to know that the WebUI files are there.
>
> Please enable DEBUG logging and try again, searching for messages from the
> StaticFileServerHandler.
>
> This handler logs every file that is requested (which effectively happens
> when the WebUI is being served); let's see what is actually being requested.
>
> On 05/12/2019 05:57, Jatin Banger wrote:
>
> I have tried that already using
> '$FLINK_HOME/bin/jobmanager.sh" start-foreground
> Ui comes fine with this one.
> Which means web/index.html is present.
>
>
> On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler 
> wrote:
>
>> hmm...this is quite odd.
>>
>> Let's try to narrow things down a bit.
>>
>> Could you try starting a local cluster (using the same distribution) and
>> checking whether the UI is accessible?
>>
>> Could you also check whether the flink-dist.jar in /lib contains
>> web/index.html?
>> On 04/12/2019 06:02, Jatin Banger wrote:
>>
>> Hi,
>>
>> I am using flink binary directly.
>>
>> I am using this command to deploy the script.
>>
>> "$FLINK_HOME/bin/standalone-job.sh"
>> start-foreground --job-classname ${ARGS_FOR_JOB}
>> where ARGS_FOR_JOB contain job class name and all other necessary details
>> needed by the job.
>>
>> Best regards,
>> Jatin
>>
>>
>> On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler 
>> wrote:
>>
>>> To clarify, you ran "mvn package -pl flink-dist -am" to build Fink?
>>>
>>> If so, could you run that again and provide us with the maven output?
>>>
>>> On 29/11/2019 11:23, Jatin Banger wrote:
>>>
>>> Hi,
>>>
>>> @vino yang   I am using flink 1.8.1
>>>
>>> I am using the following procedure for the deployment:
>>>
>>> https://github.com/apache/flink/blob/master/flink-container/docker/README.md
>>>
>>> And i tried accessing the path you mentioned:
>>>
>>> # curl :4081/#/overview
>>> {"errors":["Not found."]}
>>>
>>> Best Regards,
>>> Jatin
>>>
>>> On Thu, Nov 28, 2019 at 10:21 PM Chesnay Schepler 
>>> wrote:
>>>
 Could you try accessing :/#/overview ?

 The REST API is obviously accessible, and hence the WebUI should be too.

 How did you setup the session cluster? Are you using some custom Flink
 build or something, which potentially excluded flink-runtime-web from the
 classpath?

 On 28/11/2019 10:02, Jatin Banger wrote:

 Hi,

 I checked the log file there is no error.
 And I checked the pods internal ports by using rest api.

 # curl : 4081
 {"errors":["Not found."]}
 4081 is the Ui port

 # curl :4081/config
 {"refresh-interval":3000,"timezone-name":"Coordinated Universal
 Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
 @ 11.02.2019 @ 22:17:09 CST"}

 # curl :4081/jobs
 {"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}

 Which shows the state of the job as running.

 What else can we do ?

 Best regards,
 Jatin

 On Thu, Nov 28, 2019 at 1:28 PM vino yang 
 wrote:

> Hi Jatin,
>
> Flink web UI does not depend on any deployment mode.
>
> You should check if there are error logs in the log file and the job
> status is running state.
>
> Best,
> Vino
>
> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>
>> Hi,
>>
>> It seems there is Web Ui for Flink Session cluster, But for Flink Job
>> Cluster it is Showing
>>
>> {"errors":["Not found."]}
>>
>> Is it the expected behavior for Flink Job Cluster Mode ?
>>
>> Best Regards,
>> Jatin
>>
>

>>>
>>
>


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-08 Thread vino yang
Hi dev,

The time of the window may have different semantics.
In the session window, it's only a time gap, the size of the window is
driven via activity events.
In the tumbling or sliding window, it means the size of the window.

For more details, please see the official documentation.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows



devinbost  于2019年12月6日周五 下午10:39写道:

> I think there might be a bug in
> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>  (unless I'm just not using it correctly) because I'm able to get output
> when I use the simpler window
> `.timeWindow(Time.seconds(5))`
> However, I don't get any output when I used the session-based window.
>
>
> devinbost wrote
> > I added logging statements everywhere in my code, and I'm able to see my
> > message reach the `add` method in the AggregateFunction that I
> > implemented,
> > but the getResult method is never called.
> >
> > In the code below, I also never see the:
> >  "Ran dataStream. Adding sink next"
> > line appear in my log, and the only log statements from the
> > JsonConcatenator
> > class come from the `add` method, as shown below.
> >
> >
> > DataStream
> > 
> >  combinedEnvelopes = dataStream
> > .map(new MapFunction>() {
> > @Override
> > public Tuple2 map(String incomingMessage) throws Exception {
> > return mapToTuple(incomingMessage);
> > }
> > })
> > .keyBy(0)
> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> > .aggregate(new JsonConcatenator());
> >
> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
> > logger.info("Ran dataStream. Adding sink next")
> >
> > -
> >
> > private static class JsonConcatenator
> > implements AggregateFunction,
> > Tuple2, String> {
> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> > @Override
> > public Tuple2 createAccumulator() {
> > return new Tuple2("","");
> > }
> >
> > @Override
> > public Tuple2 add(Tuple2
> > value,
> > Tuple2 accumulator) {
> > logger.info("Running Add on value.f0: " + value.f0 + " and
> > value.f1:
> > " + value.f1);
> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> > }
> >
> > @Override
> > public String getResult(Tuple2 accumulator) {
> > logger.info("Running getResult on accumulator.f1: " +
> > accumulator.f1);
> > return "[" + accumulator.f1 + "]";
> > }
> >
> > @Override
> > public Tuple2 merge(Tuple2 String>
> > a,
> > Tuple2 b) {
> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> > a.f1
> > + " and b.f1: " + b.f1);
> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> > }
> > }
> >
> >
> >
> >
> > Any ideas?
> >
> >
> > Chris Miller-2 wrote
> >> I hit the same problem, as far as I can tell it should be fixed in
> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
> it
> >> should be available in a day or two.
> >>
> >> https://github.com/apache/pulsar/pull/5068
> >
> >
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>