[Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-17 Thread Jeff Zhang
Hi All,

I created FLINK-12214  for
adding JobListener (hook) in flink job lifecycle. Since this is a new
public api for flink, so I'd like to discuss it more widely in community to
get more feedback.

The background and motivation is that I am integrating flink into apache
zeppelin (which is a notebook in case you
don't know). And I'd like to capture some job context (like jobId) in the
lifecycle of flink job (submission, executed, cancelled) so that I can
manipulate job in more fined grained control (e.g. I can capture the jobId
when job is submitted, and then associate it with one paragraph, and when
user click the cancel button, I can call the flink cancel api to cancel
this job)

I believe other projects which integrate flink would need similar
mechanism. I plan to add api addJobListener in
ExecutionEnvironment/StreamExecutionEnvironment so that user can add
customized hook in flink job lifecycle.

Here's draft interface JobListener.

public interface JobListener {

void onJobSubmitted(JobID jobId);

void onJobExecuted(JobExecutionResult jobResult);

void onJobCanceled(JobID jobId, String savepointPath);
}

Let me know your comment and concern, thanks.


-- 
Best Regards

Jeff Zhang


flink program in a spring bean can not consume from kafka

2019-04-17 Thread jszhouch...@163.com
hi, i met a strange issue, the same code running in a java class can consume 
kafka , but when i change the java class to a spring bean(annotation is 
@service) , the program can not consume kafka amymore. does anyone met the 
similar problems or how can i debug this problems?  thanks a lot




 

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-17 Thread Paul Lam
Hi,

Could you check the watermark of the window operator? One possible situation 
would be some of the keys are not getting enough inputs, so their watermarks 
remain below the window end time and hold the window operator watermark back. 
IMO, it’s a good practice to assign watermark earlier in the data pipeline.

Best,
Paul Lam

> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> 
> `assignTimestampsAndWatermarks` before `keyBy` works:
> ```java
> DataStream trips =
>env.addSource(consumer).assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
>@Override
>public long extractTimestamp(Trip trip) {
>return trip.endTime.getTime();
>}
>});
> KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> DataStream featurizedUserTrips = userTrips.process(new 
> Featurization());
> AllWindowedStream windowedUserTrips =
>featurizedUserTrips.timeWindowAll(Time.days(7),
>Time.days(1));
> ```
> 
> But not after `keyBy` and `process`:
> ```java
> DataStream trips = env.addSource(consumer);
> KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> DataStream featurizedUserTrips =
>userTrips.process(new 
> Featurization()).assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
>@Override
>public long extractTimestamp(FeaturizedTrip trip) {
>return trip.endTime.getTime();
>}
>});
> AllWindowedStream windowedUserTrips =
>featurizedUserTrips.timeWindowAll(Time.days(7),
>Time.days(1));
> ```
> Windows are never triggered.
> 
> Is it a bug or expected behavior? If the latter, where is it documented?
> 



Re: Service discovery on YARN - find out which port was dynamically assigned to the JobManager Web Interface

2019-04-17 Thread Rong Rong
As far as I know, the port will be set to random binding.

Yarn actually have the ability to translate the proxy link to the right
node/port.
If your goal is trying to avoid going through the YARN rest proxy, this
could be a problem: There's chances that the host/port will get changed by
YARN without a backward notification to Consul (or is there a way I am not
sure). This could happen either through YARN's native failure recovery
mechanism (2nd attempt) or through HA.

CCed till who might be able to answer more comprehensively regarding this.

Thanks,
Rong

On Wed, Apr 17, 2019 at 7:14 AM Olivier Solliec 
wrote:

> Hello,
>
>
> I want to be able to register a flink cluster into a service discovery
> system (Consul in our case).
>
> This flink cluster is scheduled on YARN.
>
>
> Is there a way to know which port was assigned to the rest interface ?
>
>
> Via the rest API /jobmanager/config, I see a key "jobmanager.rpc.address"
> which is the correct yarn node, but both "rest.port" and "web.port" have
> "0" value.
>
>
> The idea is to launch a cluster, retrieve its app id, use the yarn web ui
> proxy to get the right node address/port, and register this into Consul.
>
>
> Thank you,
>
>
> Olivier
>


Re: java.io.IOException: NSS is already initialized

2019-04-17 Thread Hao Sun
I think I found the root cause

https://bugs.alpinelinux.org/issues/10126

I have to re-install nss after apk update/upgrade

Hao Sun


On Sun, Nov 11, 2018 at 10:50 AM Ufuk Celebi  wrote:

> Hey Hao,
>
> 1) Regarding Hadoop S3: are you using the repackaged Hadoop S3
> dependency from the /opt folder of the Flink distribution? Or the
> actual Hadoop implementation? If latter, would you mind also running
> it with the one that comes packaged with Flink? For this you can
> remove all Hadoop-related configuration in your flink-conf.yaml and
> copy the Hadoop S3 dependency from /opt to /lib and configure it [1].
>
> 2) Could you please share your complete Flink configuration for when
> you tried to run with Presto S3? If you don't want to share this
> publicly, feel free to share it privately with me. I'm curious to see
> whether we can reproduce this.
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
> 
> On Sat, Nov 10, 2018 at 4:07 PM Hao Sun  wrote:
> >
> > Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have
> the same issue with presto.
> >
> > - why checkpoint is not starting from 1? old chk stored in ZK caused it,
> I cleaned it up, but not very helpful
> > - I switched to Flink + Hadoop28, and used hadoop s3, with no other
> changes, check pointing is working with the hadoop flavour.
> >
> > On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:
> >>
> >> Hey Hao Sun,
> >>
> >> - Is this an intermittent failure or permanent? The logs indicate that
> >> some checkpoints completed before the error occurs (e.g. checkpoint
> >> numbers are greater than 1).
> >>
> >> - Which Java versions are you using? And which Java image? I've
> >> Googled similar issues that seem to be related to the JVM, e.g. [1].
> >>
> >> Best,
> >>
> >> Ufuk
> >>
> >> [1]
> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
> 
> >>
> >>
> >> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
> >> >
> >> > Thanks, any insight/help here is appreciated.
> >> >
> >> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
> >> >>
> >> >> Hi Hao,
> >> >>
> >> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas
> who were recently working with S3, maybe they will have some ideas.
> >> >>
> >> >> Best,
> >> >>
> >> >> Dawid
> >> >>
> >> >> On 03/11/2018 03:09, Hao Sun wrote:
> >> >>
> >> >> Same environment, new error.
> >> >>
> >> >> I can run the same docker image with my local Mac, but on K8S, this
> gives me this error.
> >> >> I can not think of any difference between local Docker and K8S
> Docker.
> >> >>
> >> >> Any hint will be helpful. Thanks
> >> >>
> >> >> 
> >> >>
> >> >> 2018-11-02 23:29:32,981 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> >> >> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> >> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> >> at java.lang.Thread.run(Thread.java:748)
> >> >> Caused by: java.lang.Exception: Could not materialize checkpoint 235
> for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).
> >> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >> >> ... 6 more
> >> >> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl
> >> >> at 

BucketingSink compressed text exactlyonce

2019-04-17 Thread Shengnan YU
Hi guys
Any good ideas to achieve exactly once BucketingSink for text file?truncating 
compressed binary file will corrupt the gzip file which means I need to -text 
that gzip and redirect to a text file then compressed it agan and finally 
upload to hdfs. Its really inefficient. Any other compressed format suitable 
for flink‘s fault tolerence mechanism?Ty

Sent from my iPhone

assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-17 Thread an00na
`assignTimestampsAndWatermarks` before `keyBy` works:
```java
DataStream trips =
env.addSource(consumer).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
DataStream featurizedUserTrips = userTrips.process(new 
Featurization());
AllWindowedStream windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```

But not after `keyBy` and `process`:
```java
DataStream trips = env.addSource(consumer);
KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
DataStream featurizedUserTrips =
userTrips.process(new 
Featurization()).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
@Override
public long extractTimestamp(FeaturizedTrip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```
Windows are never triggered.

Is it a bug or expected behavior? If the latter, where is it documented?



Service discovery on YARN - find out which port was dynamically assigned to the JobManager Web Interface

2019-04-17 Thread Olivier Solliec
Hello,


I want to be able to register a flink cluster into a service discovery system 
(Consul in our case).

This flink cluster is scheduled on YARN.


Is there a way to know which port was assigned to the rest interface ?


Via the rest API /jobmanager/config, I see a key "jobmanager.rpc.address" which 
is the correct yarn node, but both "rest.port" and "web.port" have "0" value.


The idea is to launch a cluster, retrieve its app id, use the yarn web ui proxy 
to get the right node address/port, and register this into Consul.


Thank you,


Olivier


Re: What is the best way to handle data skew processing in Data Stream applications?

2019-04-17 Thread Felipe Gutierrez
I guess I could implement a solution which is not static and extends
the OneInputStreamOperator Flink operator.
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L84

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Apr 11, 2019 at 2:21 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> thanks All for your suggestions!
>
> I am not sure if the option 3 that Fabian said I will need to change the
> Flink source code or it can be implemented on top of Flink.
> -
> 3) One approach to improve the processing of skewed data, is to change how
> keyed state is handled.
> Flink's keyed state is partitioned in two steps:
> 1. each key is assigned to a key group based on an internal hash function.
> 2. each key group is assigned to and processed by a parallel operator task.
> For full control over data placement, you need to control both.
> Changing 1) is tricky because it affects savepoint compatibility.
> Changing 2) does not help if two hot keys are assigned to the same keyed
> state.
> -
> I did an experiment
> 
> with a Mapper function
> 
> that maps to a key with one more parameter (a skew parameter). The results
> are better.
>
> Integer skewParameter = 0;
> if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3)))
> { // this is the skewed key
> skewParameter = this.skewParameterGenerator.getNextItem();
> }
> CompositeSkewedKeyStationPlatform compositeKey = new
> CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);
>
> But it is still a static solution =(. I mean, the developer has to set on
> the Mapper which key is skewed.
>
> Best,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann 
> wrote:
>
>> Just a small addition:
>>
>> If two hot keys fall into two key groups which are being processed by the
>> same TM, then it could help to change the parallelism, because then the key
>> group mapping might be different.
>>
>> If two hot keys fall into the same key group, you can adjust the max
>> parallelism which defines how many key groups will be used. By changing the
>> number, it might happen that the two hot keys fall into different key
>> groups.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske  wrote:
>>
>>> Hi Felipe,
>>>
>>> three comments:
>>>
>>> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
>>> effect:
>>> keyBy() introduces a hash partitioning such that any data partitioning
>>> that you do immediately before keyBy() is destroyed.
>>> You only change the distribution for the call of the key extractor which
>>> should be a lightweight function anyway.
>>> That's why you do not see any difference between the three methods.
>>>
>>> 2) windowAll() defines a non-keyed window over the whole stream.
>>> All records are processed by the same non-parallel instance of the
>>> window operator.
>>> That's why assigning a higher parallelism to that operator does not help.
>>>
>>> 3) One approach to improve the processing of skewed data, is to change
>>> how keyed state is handled.
>>> Flink's keyed state is partitioned in two steps:
>>> 1. each key is assigned to a key group based on an internal hash
>>> function.
>>> 2. each key group is assigned to and processed by a parallel operator
>>> task.
>>> For full control over data placement, you need to control both.
>>> Changing 1) is tricky because it affects savepoint compatibility.
>>> Changing 2) does not help if two hot keys are assigned to the same keyed
>>> state.
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com>:
>>>
 Hi,

 I am studying data skew processing in Flink and how I can change the
 low-level control of physical partition (
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
 in order to have an even processing of tuples. I have created synthetic
 skewed data sources and I aim to process (aggregate) them over a window.
 Here is the complete code:
 https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

 streamTrainsStation01.union(streamTrainsStation02)

Re: 回复:Is it possible to handle late data when using table API?

2019-04-17 Thread Lasse Nedergaard
Hi Hequn

Thanks for the details. I will give it a try. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 17. apr. 2019 kl. 04.09 skrev Hequn Cheng :
> 
> Hi Lasse,
> 
> > some devices can deliver data days back in time and I would like to have 
> > the results as fast as possible.
> 
> What JingsongLee said is correct.
> 
> However, it's possible to handle your problem with Table API according to 
> your description above. You can use the non-window(or unbounded) aggregate[1].
> The non-window aggregate supports early fire, i.e., output results 
> immediately once there is an update, so you can "have the results as fast as 
> possible". The query looks like:
> 
>  Table res30MinWindows = machineInsights
>   .select("UserActionTime / (30 * 60) as windowId, machineId, 
> machineInsightId, value")
>   .groupBy("windowId, machineId, machineInsightId")
>   .select("machineId, machineInsightId, windowId as wStart, windowId + 
> 1800 as sEnd, value.max as max")
> 
> Only you have to notice is, as non-window aggregate keeps all (result)data in 
> its state, the required state to compute the query result might grow 
> infinitely depending on the type of aggregation and the number of distinct 
> grouping keys. To solve this problem, you can provide a query configuration 
> with a valid retention interval to prevent excessive state size[2]. 
> In your case, I think the valid retention interval would be the max delay 
> interval of your data. 
> 
> Best, Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html
> 
> 
>> On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard  
>> wrote:
>> Hi
>> 
>> Thanks for the fast reply. Unfortunately it not an option as some devices 
>> can deliver data days back in time and I would like to have the results as 
>> fast as possible. 
>> I have to convert my implementation to use streaming API instead. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 16. apr. 2019 kl. 11.08 skrev JingsongLee :
>>> 
>>> Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
>>> But you can set rowtime.watermarks.delay of source to slow down the 
>>> watermark clock.
>>> 
>>> --
>>> 发件人:Lasse Nedergaard 
>>> 发送时间:2019年4月16日(星期二) 16:20
>>> 收件人:user 
>>> 主 题:Is it possible to handle late data when using table API?
>>> 
>>> Hi.
>>> 
>>> I have a simple tumble window working on eventtime like this.
>>> 
>>> Table res30MinWindows = machineInsights
>>> .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
>>> define window
>>> .groupBy("machineId, machineInsightId, w") // group by key and 
>>> window
>>> .select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
>>> value.max as max"); // access window properties and aggregate
>>> As we work with Iot units we don't have 100% control over the eventtime 
>>> reported and therefore need to handle late data to ensure that we don't do 
>>> our calculation wrong.
>>> I would like to know if there is any option in the Table API to get access 
>>> to late data, or my only option is to use Streaming API?
>>> Thanks in advance
>>> Lasse Nedergaard
>>> 
>>> 


Re: How would I use OneInputStreamOperator to deal with data skew?

2019-04-17 Thread Kurt Young
There is no reason for it, the operator and function doesn't rely on the
logic which AbstractUdfStreamOperator supplied.

Best,
Kurt


On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Thanks for the tip! I guess now it is working as it should be
> 
> .
>
> Just one last question. Why did you decide to use "AbstractStreamOperator"
> instead of "AbstractUdfStreamOperator". I am asking because I was basing my
> solution also (I also looked at your solution) on the "StreamFlatMap
> "
> class implementation.
>
> Best,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Apr 17, 2019 at 4:13 AM Kurt Young  wrote:
>
>> I think you might mixed some test codes with the operator.  "List
>> getOutputs()"  is from "TestMapBundleFunction" and only used for
>> validation.
>> For the real usage, you need to write whatever records you want to emit
>> to the "collector" which passed in during "finishBundle".
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Kurt,
>>>
>>> How do you make the finishBundle
>>> 
>>> method returns the combined tuples? I saw that there is a method
>>> "List getOutputs()" which is never called.
>>>
>>> I did an implementation
>>> 
>>> based on the example that you suggested. The MapBundleFunctionImpl
>>> 
>>>  class
>>> has the method finishBundle which iterate all the combined tuples and
>>> return it. However, my application does not continue to receive tuples
>>> after the transform method
>>> 
>>> .
>>>
>>> Thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>>
>>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young  wrote:
>>>
 I think you can simply copy the source codes to your project if maven
 dependency can not be used.

 Best,
 Kurt


 On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
 felipe.o.gutier...@gmail.com> wrote:

> Hi again Kurt,
>
> could you please help me with the pom.xml file? I have included
> all Table ecosystem dependencies and the flink-table-runtime-blink as 
> well.
> However the class org.apache.flink.table.runtime.context.ExecutionContext
> is still not found. I guess I am missing some dependency, but I do not 
> know
> which. This is my pom.xml file.
>
> http://maven.apache.org/POM/4.0.0;
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> org.sense.flink
> explore-flink
> 0.0.1-SNAPSHOT
> jar
>
> explore-flink
> http://maven.apache.org
>
> 
> UTF-8
> 1.8
> 2.11
> 
> 1.9-SNAPSHOT
> 4.12
> 
>
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-metrics-dropwizard
> ${flink.version}
> provided
> 
>
> 
> 
> org.apache.flink
>
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-table-api-scala-bridge_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-planner_${scala.binary.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-scala_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-runtime-blink
> ${flink.version}
> 
>
> 
> org.fusesource.mqtt-client
> mqtt-client
> 1.15

Re: How would I use OneInputStreamOperator to deal with data skew?

2019-04-17 Thread Kurt Young
I mean no particular reason.

Best,
Kurt


On Wed, Apr 17, 2019 at 7:44 PM Kurt Young  wrote:

> There is no reason for it, the operator and function doesn't rely on the
> logic which AbstractUdfStreamOperator supplied.
>
> Best,
> Kurt
>
>
> On Wed, Apr 17, 2019 at 4:35 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Thanks for the tip! I guess now it is working as it should be
>> 
>> .
>>
>> Just one last question. Why did you decide to use
>> "AbstractStreamOperator" instead of "AbstractUdfStreamOperator". I am
>> asking because I was basing my solution also (I also looked at your
>> solution) on the "StreamFlatMap
>> "
>> class implementation.
>>
>> Best,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Apr 17, 2019 at 4:13 AM Kurt Young  wrote:
>>
>>> I think you might mixed some test codes with the operator.
>>> "List getOutputs()"  is from "TestMapBundleFunction" and only used
>>> for validation.
>>> For the real usage, you need to write whatever records you want to emit
>>> to the "collector" which passed in during "finishBundle".
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi Kurt,

 How do you make the finishBundle
 
 method returns the combined tuples? I saw that there is a method
 "List getOutputs()" which is never called.

 I did an implementation
 
 based on the example that you suggested. The MapBundleFunctionImpl
 
  class
 has the method finishBundle which iterate all the combined tuples and
 return it. However, my application does not continue to receive tuples
 after the transform method
 
 .

 Thanks,
 Felipe

 *--*
 *-- Felipe Gutierrez*

 *-- skype: felipe.o.gutierrez*
 *--* *https://felipeogutierrez.blogspot.com
 *


 On Tue, Apr 16, 2019 at 3:10 AM Kurt Young  wrote:

> I think you can simply copy the source codes to your project if maven
> dependency can not be used.
>
> Best,
> Kurt
>
>
> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi again Kurt,
>>
>> could you please help me with the pom.xml file? I have included
>> all Table ecosystem dependencies and the flink-table-runtime-blink as 
>> well.
>> However the class org.apache.flink.table.runtime.context.ExecutionContext
>> is still not found. I guess I am missing some dependency, but I do not 
>> know
>> which. This is my pom.xml file.
>>
>> http://maven.apache.org/POM/4.0.0;
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0
>>
>> org.sense.flink
>> explore-flink
>> 0.0.1-SNAPSHOT
>> jar
>>
>> explore-flink
>> http://maven.apache.org
>>
>> 
>> UTF-8
>> 1.8
>> 2.11
>> 
>> 1.9-SNAPSHOT
>> 4.12
>> 
>>
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-clients_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-streaming-java_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-metrics-dropwizard
>> ${flink.version}
>> provided
>> 
>>
>> 
>> 
>> org.apache.flink
>>
>> flink-table-api-java-bridge_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>>
>> flink-table-api-scala-bridge_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-planner_${scala.binary.version}
>> ${flink.version}
>> 
>>
>> 

Re: Scalaj vs akka as http client for Asyncio Flink

2019-04-17 Thread Till Rohrmann
Check the logs what Akka is logging and verify that the port you try to
bind to is free.

Cheers,
Till

On Wed, Apr 17, 2019 at 12:50 PM Andy Hoang  wrote:

> Hi Till,
>
> Sorry to bother you again, so I manage to build and work with akka http
> client in my local
> After deploy to yarn node, the actorsystem cant be connected.
>  ```
> PPLogger.getActivityLogger.info("### 1")
> implicit  val system = ActorSystem("my-system")
> PPLogger.getActivityLogger.info("### 2")
> ```
> So the line ### 2 was never print, and the method ended up timeout.
> Honestly I dont know how to debug with this case.
>
> I’m just curious how people ended up using any async http client without
> hassle?
>
> Thanks,
>
> Andy,
>
> On Apr 12, 2019, at 10:23 PM, Till Rohrmann  wrote:
>
> Hi Andy,
>
> you can do some micro benchmarks where you instantiate your
> AsyncHttpClient and call the invoke method. But better would be to
> benchmark it end-to-end by running it on a cluster with a realistic
> workload which you also expect to occur in production.
>
> Cheers,
> Till
>
> On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang 
> wrote:
>
>> Hi Till,
>> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use
>> that feature: https://issues.apache.org/jira/browse/FLINK-6756
>> Meanwhile I can reimplement it in the copy-patse manner but I’m still
>> curious if my AsyncHttpClient
>>  work nicely or not, what would be the down side when you look at it.
>> I understand the open/close method is will help in term of init/cleaning
>> resource, but how can we benchmark the solution to make sure one is better
>> than the other? What is the key to decide here or we have to try it in
>> production first?
>> Thank a lot, again
>>
>> Andy,
>>
>>
>> On Apr 12, 2019, at 2:44 PM, Till Rohrmann  wrote:
>>
>> Hi Andy,
>>
>> there is also a Scala version of the `RichAsyncFunction`.
>>
>> In Scala you have to specify a value for class members. This is different
>> from Java.
>>
>> User code is first instantiated on the client where you create the job
>> topology (basically where you call new RichAsyncHttpClient). The code is
>> then serialized and shipped to the cluster where it is actually executed.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang 
>> wrote:
>>
>>> Hi Till,
>>> Thanks for your reply, I manage do some experiments and has result as
>>> some worked and some not. I hope you can give me a bit more insight:
>>>
>>> As your suggestion to impl a `RichAsyncFunction` with transient field,
>>> like this and having error
>>>
>>> ```
>>> Class 'RichAsyncHttpClient' must either be declared abstract or
>>> implement abstract member 'executionContext: ExecutionContextExecutor' in
>>> ‘com.parcelperform.util.RichAsyncHttpClient’
>>> ```
>>>
>>> ```
>>>
>>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
>>> Either[Throwable, ResponseEntity]]{
>>>
>>>   PPLogger.getActivityLogger.info("###INIT --- ")
>>>   @transient implicit var materializer: ActorMaterializer
>>>   @transient implicit var system: ActorSystem
>>>   @transient implicit var executionContext: ExecutionContextExecutor
>>>
>>>
>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>
>>> val resultFutureRequested: Future[HttpResponse] = 
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json;))
>>>
>>> resultFutureRequested.onComplete {
>>>   case Success(res) => {
>>> resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>>>   }
>>>   case Failure(x)   => {
>>> resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>>>   }
>>> }
>>>   }
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>> super.open(parameters)
>>> system = ActorSystem("my-system")
>>> executionContext = system.dispatcher
>>> materializer = ActorMaterializer()
>>>   }
>>> }
>>>
>>> ```
>>>
>>> Aslo the Usage of that class, I has error, I guess its because of
>>> java/scala issue. In flink doc, for java code they use RichAsyncFunction
>>> and for scala they use AsyncFunction:
>>> ```
>>> //AsyncDataStream.unorderedWait(streamShipment, new
>>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ## error Type
>>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
>>> RichAsyncHttpClient
>>>
>>> ```
>>>
>>> ###
>>>
>>> So I try to fix my current code again with transient field and move it
>>> into constructor:
>>> ```
>>>
>>>
>>>
>>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>>
>>>   @transient implicit lazy val system = {
>>> PPLogger.getActivityLogger.info("###INIT --- ")
>>> ActorSystem("my-system")
>>>   }
>>>   @transient implicit lazy val executionContext = {
>>> system.dispatcher
>>>   }
>>>   

Re: Scalaj vs akka as http client for Asyncio Flink

2019-04-17 Thread Andy Hoang
Hi Till,

Sorry to bother you again, so I manage to build and work with akka http client 
in my local
After deploy to yarn node, the actorsystem cant be connected.
 ```
PPLogger.getActivityLogger.info("### 1")
implicit  val system = ActorSystem("my-system")
PPLogger.getActivityLogger.info("### 2")
```
So the line ### 2 was never print, and the method ended up timeout. Honestly I 
dont know how to debug with this case.

I’m just curious how people ended up using any async http client without hassle?

Thanks,

Andy,

> On Apr 12, 2019, at 10:23 PM, Till Rohrmann  wrote:
> 
> Hi Andy,
> 
> you can do some micro benchmarks where you instantiate your AsyncHttpClient 
> and call the invoke method. But better would be to benchmark it end-to-end by 
> running it on a cluster with a realistic workload which you also expect to 
> occur in production.
> 
> Cheers,
> Till
> 
> On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang  > wrote:
> Hi Till,
> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that 
> feature: https://issues.apache.org/jira/browse/FLINK-6756 
> 
> Meanwhile I can reimplement it in the copy-patse manner but I’m still curious 
> if my AsyncHttpClient
>  work nicely or not, what would be the down side when you look at it.
> I understand the open/close method is will help in term of init/cleaning 
> resource, but how can we benchmark the solution to make sure one is better 
> than the other? What is the key to decide here or we have to try it in 
> production first?
> Thank a lot, again
> 
> Andy,
> 
> 
>> On Apr 12, 2019, at 2:44 PM, Till Rohrmann > > wrote:
>> 
>> Hi Andy,
>> 
>> there is also a Scala version of the `RichAsyncFunction`.
>> 
>> In Scala you have to specify a value for class members. This is different 
>> from Java.
>> 
>> User code is first instantiated on the client where you create the job 
>> topology (basically where you call new RichAsyncHttpClient). The code is 
>> then serialized and shipped to the cluster where it is actually executed.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang > > wrote:
>> Hi Till, 
>> Thanks for your reply, I manage do some experiments and has result as some 
>> worked and some not. I hope you can give me a bit more insight:
>> 
>> As your suggestion to impl a `RichAsyncFunction` with transient field, like 
>> this and having error
>> 
>> ```
>> Class 'RichAsyncHttpClient' must either be declared abstract or implement 
>> abstract member 'executionContext: ExecutionContextExecutor' in 
>> ‘com.parcelperform.util.RichAsyncHttpClient’
>> ```
>> 
>> ```
>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
>> Either[Throwable, ResponseEntity]]{
>> 
>>   PPLogger.getActivityLogger.info("###INIT --- ")
>>   @transient implicit var materializer: ActorMaterializer
>>   @transient implicit var system: ActorSystem
>>   @transient implicit var executionContext: ExecutionContextExecutor
>> 
>> 
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> 
>> val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json 
>> "))
>> 
>> resultFutureRequested.onComplete {
>>   case Success(res) => {
>> resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>>   }
>>   case Failure(x)   => {
>> resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>>   }
>> }
>>   }
>> 
>>   override def open(parameters: Configuration): Unit = {
>> super.open(parameters)
>> system = ActorSystem("my-system")
>> executionContext = system.dispatcher
>> materializer = ActorMaterializer()
>>   }
>> }
>> ```
>> 
>> Aslo the Usage of that class, I has error, I guess its because of java/scala 
>> issue. In flink doc, for java code they use RichAsyncFunction and for scala 
>> they use AsyncFunction:
>> ```
>> //AsyncDataStream.unorderedWait(streamShipment, new 
>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ## error Type 
>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: 
>> RichAsyncHttpClient
>> 
>> ```
>> 
>> ### 
>> 
>> So I try to fix my current code again with transient field and move it into 
>> constructor:
>> ```
>> 
>> 
>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>> 
>>   @transient implicit lazy val system = {
>> PPLogger.getActivityLogger.info("###INIT --- ")
>> ActorSystem("my-system")
>>   }
>>   @transient implicit lazy val executionContext = {
>> system.dispatcher
>>   }
>>   @transient implicit lazy val materializer: ActorMaterializer = {
>> 

Fast restart of a job with a large state

2019-04-17 Thread Sergey Zhemzhitsky
Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints


kafka partitions, data locality

2019-04-17 Thread Smirnov Sergey Vladimirovich (39833)
Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client's payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov


Re: How would I use OneInputStreamOperator to deal with data skew?

2019-04-17 Thread Felipe Gutierrez
Thanks for the tip! I guess now it is working as it should be

.

Just one last question. Why did you decide to use "AbstractStreamOperator"
instead of "AbstractUdfStreamOperator". I am asking because I was basing my
solution also (I also looked at your solution) on the "StreamFlatMap
"
class implementation.

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Wed, Apr 17, 2019 at 4:13 AM Kurt Young  wrote:

> I think you might mixed some test codes with the operator.  "List
> getOutputs()"  is from "TestMapBundleFunction" and only used for
> validation.
> For the real usage, you need to write whatever records you want to emit to
> the "collector" which passed in during "finishBundle".
>
> Best,
> Kurt
>
>
> On Wed, Apr 17, 2019 at 12:50 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Kurt,
>>
>> How do you make the finishBundle
>> 
>> method returns the combined tuples? I saw that there is a method
>> "List getOutputs()" which is never called.
>>
>> I did an implementation
>> 
>> based on the example that you suggested. The MapBundleFunctionImpl
>> 
>>  class
>> has the method finishBundle which iterate all the combined tuples and
>> return it. However, my application does not continue to receive tuples
>> after the transform method
>> 
>> .
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Apr 16, 2019 at 3:10 AM Kurt Young  wrote:
>>
>>> I think you can simply copy the source codes to your project if maven
>>> dependency can not be used.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi again Kurt,

 could you please help me with the pom.xml file? I have included
 all Table ecosystem dependencies and the flink-table-runtime-blink as well.
 However the class org.apache.flink.table.runtime.context.ExecutionContext
 is still not found. I guess I am missing some dependency, but I do not know
 which. This is my pom.xml file.

 http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
 4.0.0

 org.sense.flink
 explore-flink
 0.0.1-SNAPSHOT
 jar

 explore-flink
 http://maven.apache.org

 
 UTF-8
 1.8
 2.11
 
 1.9-SNAPSHOT
 4.12
 

 
 
 org.apache.flink
 flink-java
 ${flink.version}
 
 
 org.apache.flink
 flink-clients_${scala.binary.version}
 ${flink.version}
 
 
 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}
 
 
 org.apache.flink
 flink-metrics-dropwizard
 ${flink.version}
 provided
 

 
 
 org.apache.flink

 flink-table-api-java-bridge_${scala.binary.version}
 ${flink.version}
 
 
 org.apache.flink

 flink-table-api-scala-bridge_${scala.binary.version}
 ${flink.version}
 
 
 org.apache.flink
 flink-table-planner_${scala.binary.version}
 ${flink.version}
 

 
 org.apache.flink
 flink-streaming-scala_${scala.binary.version}
 ${flink.version}
 
 
 org.apache.flink
 flink-table-runtime-blink
 ${flink.version}
 

 
 org.fusesource.mqtt-client
 mqtt-client
 1.15
 
 

 
 org.slf4j
 slf4j-api
 1.7.26
 
 
 org.slf4j
 slf4j-log4j12
 1.7.26
 

 
 junit
 junit
 ${junit.version}
 
 
 
 explore-flink
 
 
 
 org.apache.maven.plugins
 maven-eclipse-plugin
 2.10
 
 true
 false
 
 

 
 
 org.apache.maven.plugins
 maven-compiler-plugin