Re: Does Flink release Hadoop(R) 2.8 work with Hadoop 3?

2018-07-27 Thread vino yang
Hi Mich,

I think this depends on the backward compatibility of the Hadoop client
API. In theory, there is no problem.
Hadoop 2.8 to Hadoop 3.0 is a very large upgrade, and personally recommend
using a client version that is consistent with the Hadoop cluster.
By compiling and packaging from source, you can let Flink bundle specific
Hadoop versions. Flink has shaded hadoop see here[1]. You can try to change
the maven version and repackage the project.

[1]: https://github.com/apache/flink/tree/master/flink-shaded-hadoop

Thanks, vino.

2018-07-27 23:31 GMT+08:00 Mich Talebzadeh :

> Hi,
>
> I can run Flink without bundled Hadoop fine. I was wondering if Flink with
> Hadoop 2.8 works with Hadoop 3 as well?
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Flink Cluster and Pipeline Version Compatibility?

2018-07-27 Thread vino yang
Hi Jlist9:

Most of Flink's APIs try to ensure backward compatibility. But no
documentation gives all APIs to do this. With the development of Flink,
some features have changed dramatically, such as State, so Flink's official
website gives the migration guidance document[1]. So, my personal
suggestion is that the best choice is that the cluster is consistent with
the version of the Flink API that the Job depends on, especially the Major
version of Flink.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/migration.html

Thanks, vino.

2018-07-27 19:57 GMT+08:00 Hequn Cheng :

> Hi, jlist9
>
> >  Is it so that the pipeline jars must be build with the same version of
> the cluster they'll be running on?
> Most interfaces are supported for backward comparability. And the closer
> the flink version is, the smaller the differences between interfaces.
> However, it is not for sure. Hence, it is recommended to build your jars
> with the version of the cluster.
>
> Best, Hequn
>
> On Fri, Jul 27, 2018 at 1:59 AM, jlist9  wrote:
>
>> I was trying to find some comparability tables between various versions
>> of Flink clusters and pipeline jars but haven't run into any so far. Is it
>> so that the pipeline jars must be build with the same version of the
>> cluster they'll be running on? Or there are some backward comparability? If
>> it's already documented/discussed somewhere a link would be appreciated.
>>
>> Jack
>>
>
>


Re: How to connect more than 2 hetrogenous Streams!!

2018-07-27 Thread vino yang
Hi Puneet,

Hequn gave you two good solutions. If you have a good knowledge of Flink
DataStream API.
You can also customize it to connect more than two streams, you must know :

   - DataStream#connect API
   - ConnectedStreams
   - CoMapFunction, CoFlatMapFunction...

Referring to them, you can got connect(DataStream1, DataStream2...).

Really, some scenario need more than two streams connecting such as CEP.

Thanks, vino.


2018-07-27 19:48 GMT+08:00 Hequn Cheng :

> Hi Puneet,
>
> Flink doesn't support connecting more than 2 streams with different
> schema. There are ways I think might help you.
> 1. unify the schema and use union.
> 2. use multi join to join different streams.
>
> Hope this helps.
> Hequn
>
> On Thu, Jul 26, 2018 at 2:29 PM, Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi
>>
>> Is there a way to connect more than 2 streams with different stream schema
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>


Re: Order of events in a Keyed Stream

2018-07-27 Thread Hequn Cheng
Hi Harshvardhan,

There are a number of factors to consider.
1. the consecutive Kafka messages must exist in a same topic of kafka.
2. the data should not been rebalanced. For example, operators should be
chained in order to avoid rebalancing.
3. if you perform keyBy(), you should keyBy on a field the consecutive two
messages share the same value.

Best, Hequn

On Sat, Jul 28, 2018 at 12:11 AM, Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi,
>
>
> We are currently using Flink to process financial data. We are getting
> position data from Kafka and we enrich the positions with account and
> product information. We are using Ingestion time while processing events.
> The question I have is: say I key the position datasream by account number.
> If I have two consecutive Kafka messages with the same account and product
> info where the second one is an updated position of the first one, does
> Flink guarantee that the messages will be processed on the same slot in the
> same worker? We want to ensure that we don’t process them out of order.
>
> Thank you!
> --
> Regards,
> Harshvardhan
>


Re: AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

2018-07-27 Thread Vijay Balakrishnan
Hi,

Turns out the issue was with the RichParallelSourceFunction I was using was
resulting in the Sink not getting called after the connect
SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke
functions work fine now after I replaced the *ParallelCameraSource (*
RichParallelSourceFunction) with the old* CheckpointedCameraWithCubeSource.*





DataStream keyedByCamCameraStream = env
.addSource(*new *CheckpointedCameraWithCubeSource(maxSeqCnt,
servingSpeedMs, startTime, nbrCameras, outputFile), *"TileDB Camera"*)
.uid(*"TileDB-Camera"*)
.keyBy((cameraWithCube) -> cameraWithCube.*cameraKey *!= *null *?
cameraWithCube.*cameraKey*.getCam() : *new *Object())
.process(*new *ProcessFunction() {
@Override
*public void *processElement(CameraWithCube value, Context ctx,
Collector out) *throws *Exception {
out.collect(value);
}
})
.setParallelism(parallelCamTasks);











*/*DataStream keyedByCamCameraStream = env
.addSource(new ParallelCameraSource(maxSeqCnt, servingSpeedMs, startTime,
nbrCameras, outputFile), "TileDB Camera")
.uid("TileDB-Camera").setParallelism(parallelCamTasks)
.partitionCustom((Partitioner) (key, numPartitions) ->
{return key % numPartitions;}, new
KeySelector() {@Override
public Integer getKey(CameraWithCube cameraWithCube) throws Exception {
   ;}});*/*



Vijay

On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan 
wrote:

> Hi,
>
> I have 2 AsyncFunctions SampleCopyAsyncFunction and
> SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st
>  AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke
> gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never
> gets called(though open and close functions are called). Is there any way
> for me to have the 2nd asyncInvoke get called ? I have an 
> Executors.newFixedThreadPool(..)
> that I use within each AsyncFunction.
>
>
>
>
> TIA
>
>
>
>
>
> Here is the code:
>
>
>
> AsyncFunction cameraWithCubeAsyncFunction =
>
> new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
> options, nThreads);
>
> DataStream cameraWithCubeDataStreamAsync =
>
> AsyncDataStream.unorderedWait(keyedByCamCameraStream,
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
>
> .setParallelism(parallelCamTasks);//.startNewChain()
>
> DataStream cameraWithCubeDataStream =
> cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
> cameraWithCube.cameraKey != null ?
>
> cameraWithCube.cameraKey.getTs() : new Object());
>
> String uuid = UUID.randomUUID().toString();
>
> DataStream>
> enrichedCameraFeed = inputMetadataDataStream
>
> .connect(cameraWithCubeDataStream)
>
> .flatMap(new SyncLatchFunction(outputFile, outputPath,
> uuid))
>
> .uid("connect2Streams")
>
> .setParallelism(1);
>
> AsyncFunction,
> Tuple2> cubeSinkAsyncFunction =
>
> new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
> options, nThreads, uuid);
>
> DataStream>
> enrichedCameraFeedSinkAsync =
>
> AsyncDataStream.unorderedWait(enrichedCameraFeed,
> cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)
>
> .setParallelism(parallelCubeTasks)
>
> .uid("Read-Image-Async");//ç== asyncInvoke never
> gets called for 2nd AsyncFunction
>
> DataStream>
> enrichedCameraFeedSinkAsyncDataStream =
> enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
> null ?
>
> tuple2.f0.inputMetadataKey.getTs() : new Object());
>
> //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t
> work
>
> enrichedCameraFeedSinkAsyncDataStream.addSink(new
> CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS
>
> .setParallelism(parallelCubeTasks)
>
> .uid("Cube-Sink");
>


Re: RuntimeException with valve output watermark when using CoGroup

2018-07-27 Thread Chesnay Schepler
At first glance this looks like a bug. Is the nothing in the stack trace 
after the NullPointerException?


How reliably can you reproduce this?

On 27.07.2018 19:00, Taneli Saastamoinen wrote:

Morning everyone,

I'm getting the following exception in my Flink job (Flink version is 
1.5.0):


java.lang.RuntimeException: Exception occurred while processing valve 
output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)


I'm not sure why this is happening but I suspect it could be a bug in 
Flink.


My code is too proprietary to be shared directly, but here's the 
general gist. I'm getting data in as JSON, parsing it into POJOs, and 
then aggregating those with coGroup(), taking the maximum of two 
separate fields. I then take the results of this and aggregate it 
again, taking the average of these maximums grouped by a different 
field. In pseudocode:


// first aggregation

parsed = source
.flatMap(new JsonConverterAndFilterer())
.assignTimestampsAndWatermarks(new 
MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));


X = parsed.filter(q -> q.getX() != null);

Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);

joined = X
.coGroup(Y)
.where(PojoClass::getId)
.equalTo(AggregatedPojoClass::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(findMaximums());

// findMaximums() returns:

@Override
public void coGroup(Iterable xMaybe, 
Iterable yMaybe, Collector 
collector) throws Exception {

final Iterator x = xMaybe.iterator();
final Iterator y = yMaybe.iterator();
if(x.hasNext() && y.hasNext()) {
final PojoClass maxX = findMaxX(x);
final AggregatedPojoClass maxY = findMaxY(y);
if(maxX != null && maxY != null) {
collector.collect(new 
AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));

} else {
log.warn("[CoGroup case 1] Max X or max Y is null - 
SKIPPING: max x {}, max y {}", maxX, maxY);

}
} // ...other cases omitted for brevity...

// second aggregation

final DataStream result = joined
.keyBy(AggregatedPojoClass::getSecondaryId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(new WindowAverageFunction());

final DataStream resultJson = result
.map(JsonUtil::toJson);
// then a simple sink on this

// WindowAverageFunction is:

public class WindowAverageFunction implements 
WindowFunctionTimeWindow> {

@Override
public void apply(String secondaryId, TimeWindow timeWindow, 
Iterable input, Collector 
out) throws Exception {

final Iterator i = input.iterator();
if(!i.hasNext()) {
log.warn("Got empty window for secondary id '{}' - 
ignoring...", secondaryId);

return;
}
// calculate some simple averages of the X and Y...


Now when I run this code with some real data, an exception happens on 
line 88 here, inside the custom coGroup() function above (I've added 
line numbers to clarify):


87if(maxY != null) {
88collector.collect(maxY);
89} else {
90log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91}

The stack trace is as follows:

java.lang.RuntimeException: Exception occurred while processing valve 
output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
at 
org.apache.flink.streami

RuntimeException with valve output watermark when using CoGroup

2018-07-27 Thread Taneli Saastamoinen
Morning everyone,

I'm getting the following exception in my Flink job (Flink version is
1.5.0):

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)

I'm not sure why this is happening but I suspect it could be a bug in Flink.

My code is too proprietary to be shared directly, but here's the general
gist. I'm getting data in as JSON, parsing it into POJOs, and then
aggregating those with coGroup(), taking the maximum of two separate
fields. I then take the results of this and aggregate it again, taking the
average of these maximums grouped by a different field. In pseudocode:

// first aggregation

parsed = source
.flatMap(new JsonConverterAndFilterer())
.assignTimestampsAndWatermarks(new
MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));

X = parsed.filter(q -> q.getX() != null);

Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);

joined = X
.coGroup(Y)
.where(PojoClass::getId)
.equalTo(AggregatedPojoClass::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(findMaximums());

// findMaximums() returns:

@Override
public void coGroup(Iterable xMaybe,
Iterable yMaybe, Collector
collector) throws Exception {
final Iterator x = xMaybe.iterator();
final Iterator y = yMaybe.iterator();
if(x.hasNext() && y.hasNext()) {
final PojoClass maxX = findMaxX(x);
final AggregatedPojoClass maxY = findMaxY(y);
if(maxX != null && maxY != null) {
collector.collect(new
AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
} else {
log.warn("[CoGroup case 1] Max X or max Y is null - SKIPPING:
max x {}, max y {}", maxX, maxY);
}
} // ...other cases omitted for brevity...

// second aggregation

final DataStream result = joined
.keyBy(AggregatedPojoClass::getSecondaryId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(new WindowAverageFunction());

final DataStream resultJson = result
.map(JsonUtil::toJson);
// then a simple sink on this

// WindowAverageFunction is:

public class WindowAverageFunction implements
WindowFunction {
@Override
public void apply(String secondaryId, TimeWindow timeWindow,
Iterable input, Collector out)
throws Exception {
final Iterator i = input.iterator();
if(!i.hasNext()) {
log.warn("Got empty window for secondary id '{}' -
ignoring...", secondaryId);
return;
}
// calculate some simple averages of the X and Y...


Now when I run this code with some real data, an exception happens on line
88 here, inside the custom coGroup() function above (I've added line
numbers to clarify):

87if(maxY != null) {
88collector.collect(maxY);
89} else {
90log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91}

The stack trace is as follows:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.ap

Order of events in a Keyed Stream

2018-07-27 Thread Harshvardhan Agrawal
Hi,


We are currently using Flink to process financial data. We are getting
position data from Kafka and we enrich the positions with account and
product information. We are using Ingestion time while processing events.
The question I have is: say I key the position datasream by account number.
If I have two consecutive Kafka messages with the same account and product
info where the second one is an updated position of the first one, does
Flink guarantee that the messages will be processed on the same slot in the
same worker? We want to ensure that we don’t process them out of order.

Thank you!
-- 
Regards,
Harshvardhan


Does Flink release Hadoop(R) 2.8 work with Hadoop 3?

2018-07-27 Thread Mich Talebzadeh
Hi,

I can run Flink without bundled Hadoop fine. I was wondering if Flink with
Hadoop 2.8 works with Hadoop 3 as well?

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Committing Kafka Transactions during Savepoint

2018-07-27 Thread Scott Kidder
Thank you, Aljoscha! Are Kafka transactions committed when a running job
has been instructed to cancel with a savepoint (e.g. `flink cancel -s
`)? This is my primary use for savepoints. I would expect that when a
new job is submitted with the savepoint, as in the case of an application
upgrade, Flink withl create a new Kafka transaction and processing will be
exactly-once.

--Scott Kidder

On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek 
wrote:

> Hi,
>
> this has been in the back of my head for a while now. I finally created a
> Jira issue: https://issues.apache.org/jira/browse/FLINK-9983
>
> In there, I also outline a better fix that will take a bit longer to
> implement.
>
> Best,
> Aljoscha
>
> On 26. Jul 2018, at 23:04, Scott Kidder  wrote:
>
> I recently began using the exactly-once processing semantic with the Kafka
> 0.11 producer in Flink 1.4.2. It's been working great!
>
> Are Kafka transactions committed when creating a Flink savepoint? How does
> this affect the recovery behavior in Flink if, before the completion of the
> next checkpoint, the application is restarted and restores from a
> checkpoint taken before the savepoint? It seems like this might lead to the
> Kafka producer writing a message multiple times with different committed
> Kafka transactions.
>
> --
> Scott Kidder
>
>
>


Re: Committing Kafka Transactions during Savepoint

2018-07-27 Thread Aljoscha Krettek
Hi,

this has been in the back of my head for a while now. I finally created a Jira 
issue: https://issues.apache.org/jira/browse/FLINK-9983 


In there, I also outline a better fix that will take a bit longer to implement.

Best,
Aljoscha

> On 26. Jul 2018, at 23:04, Scott Kidder  wrote:
> 
> I recently began using the exactly-once processing semantic with the Kafka 
> 0.11 producer in Flink 1.4.2. It's been working great!
> 
> Are Kafka transactions committed when creating a Flink savepoint? How does 
> this affect the recovery behavior in Flink if, before the completion of the 
> next checkpoint, the application is restarted and restores from a checkpoint 
> taken before the savepoint? It seems like this might lead to the Kafka 
> producer writing a message multiple times with different committed Kafka 
> transactions.
> 
> --
> Scott Kidder



Re: Flink Cluster and Pipeline Version Compatibility?

2018-07-27 Thread Hequn Cheng
Hi, jlist9

>  Is it so that the pipeline jars must be build with the same version of
the cluster they'll be running on?
Most interfaces are supported for backward comparability. And the closer
the flink version is, the smaller the differences between interfaces.
However, it is not for sure. Hence, it is recommended to build your jars
with the version of the cluster.

Best, Hequn

On Fri, Jul 27, 2018 at 1:59 AM, jlist9  wrote:

> I was trying to find some comparability tables between various versions of
> Flink clusters and pipeline jars but haven't run into any so far. Is it so
> that the pipeline jars must be build with the same version of the cluster
> they'll be running on? Or there are some backward comparability? If it's
> already documented/discussed somewhere a link would be appreciated.
>
> Jack
>


Re: Custom Window example (data-based)

2018-07-27 Thread vino yang
Hi Chris,

I just find some resource you can have a look, list below:


   - Flink official documentation :
   
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#built-in-and-custom-triggers
   - Custom, Complex Windows at Scale using Apache Flink(video):
   https://www.youtube.com/watch?v=XUvqnsWm8yo
   - Custom, Complex Windows @Scale Using Apache Flink(video):
   https://www.infoq.com/presentations/events-windows-apache-flink
   - Custom Trigger :
   https://gist.github.com/mxm/c5831ead9c9d9ad68731f5f2f3793154
   - Flink training about Window and aggregation :
   
https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-windows-time

Hope some of them can help you.

Thanks, vino.


2018-07-27 19:40 GMT+08:00 chrisr123 :

> I want to get some experience implementing a custom window assigner,
> trigger,
> evictor, etc.
> Does anyone have an example of a custom window implementation that I could
> look at, or an
> idea for one to implement? The goal is to learn the custom window API. I'm
> looking for something
> besides a time or count window. The trigger would be based on some
> attribute(s) of the data.
> Thanks!
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: How to connect more than 2 hetrogenous Streams!!

2018-07-27 Thread Hequn Cheng
Hi Puneet,

Flink doesn't support connecting more than 2 streams with different schema.
There are ways I think might help you.
1. unify the schema and use union.
2. use multi join to join different streams.

Hope this helps.
Hequn

On Thu, Jul 26, 2018 at 2:29 PM, Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi
>
> Is there a way to connect more than 2 streams with different stream schema
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Custom Window example (data-based)

2018-07-27 Thread chrisr123
I want to get some experience implementing a custom window assigner, trigger,
evictor, etc.
Does anyone have an example of a custom window implementation that I could
look at, or an
idea for one to implement? The goal is to learn the custom window API. I'm
looking for something
besides a time or count window. The trigger would be based on some
attribute(s) of the data.
Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink TaskManager binding data port to single interface while rpc port binds to all?

2018-07-27 Thread David Corley
We're seeing an issue with our Flink TMs (1.4.2) where we explicitly set
the TM data and RPC ports. When the TM comes up, we see the following
bindings:

==
tcp6   0  0 :::9249 :::*LISTEN
2284/java
tcp6   0  0 10.141.34.17:6121   :::*LISTEN
2284/java
tcp6   0  0 :::6122 :::*LISTEN
2284/java
==

9249 is our Prometheus listener port
6121 is our data port
6122 is our RPC port

As you can see both the prometheus and rpc ports are bound to all
interfaces, but the data port is bound to a specific interface. The problem
is that it seems to be unpredictable which interface is chosen. We use
Docker, and we find that _sometimes_ the TM binds to the Docker interface,
and _sometimes_ it binds to the host interface, but only for the data port.

Is this expected? Is there any way to _force_ the TM to use a specific
interface/address?


Re: Checkpointing not happening in Standalone HA mode

2018-07-27 Thread Vinay Patil
Hi Vino,

Yes, Job runs successfully, however, no checkpoints are successful. I will
update the source

Regards,
Vinay Patil


On Fri, Jul 27, 2018 at 2:00 PM vino yang  wrote:

> Hi Vinay,
>
> Oh!  You use a collection source? That's the problem. Please use a general
> source like Kafka or others. Maybe your checkpoint has not be triggered,
> your job has stopped.
>
> Thanks, vino.
>
> 2018-07-27 16:07 GMT+08:00 Vinay Patil :
>
>> Hi Vino,
>>
>> Yes I am enabling checkpoint in the code as follows :
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createRemoteEnvironment(",,getJobConfiguration(),jarPath");
>>
>>
>> env.enableCheckpointing(1000);
>>
>> env.setSateBackend(new 
>> FsStateBackend("file:///"));
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>
>>
>> In getJobConfiguration method I have set HA related properties like
>> HA_STORAGE_PATH,HA_ZOOKEEPER_QUORUM,HA_ZOOKEEPER_ROOT,HA_MODE,HA_JOB_MANAGER_PORT_RANGE,HA_CLUSTER_ID
>>
>>
>> I can see the error in Job Manager logs where it says Collection Source
>> is not being executed at the moment. Aborting checkpoint. In the pipeline I
>> have a stream initialized using "fromCollection". I think I will have to
>> get rid of this.
>>
>> What do you suggest
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Thu, Jul 26, 2018 at 12:04 PM vino yang  wrote:
>>
>>> Hi Vinay:
>>>
>>> Did you call specific config API refer to this documentation[1];
>>>
>>> Can you share your job program and JM Log? Or the JM log contains the
>>> log message like this pattern "Triggering checkpoint {} @ {} for job {}."?
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-25 19:43 GMT+08:00 Chesnay Schepler :
>>>
 Can you provide us with the job code?

 I assume that checkpointing runs properly if you submit the same job to
 a normal cluster?


 On 25.07.2018 13:15, Vinay Patil wrote:

 No error in the logs. That is why I am not able to understand why
 checkpoints are not getting triggered.

 Regards,
 Vinay Patil


 On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
 wrote:

> Hi Chesnay,
>
> No error in the logs. That is why I am not able to understand why
> checkpoints are getting triggered.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
> wrote:
>
>> Please check the job- and taskmanager logs for anything suspicious.
>>
>> On 25.07.2018 12:33, Vinay Patil wrote:
>>
>> Hi,
>>
>> I am starting the cluster using bootstrap application where in I am
>> calling Job Manager and Task Manager main class to form the cluster. The 
>> HA
>> cluster is formed correctly and I am able to submit jobs to this cluster
>> using RemoteExecutionEnvironment but when I enable checkpointing in code 
>> I
>> do not see any checkpoints triggered on Flink UI.
>>
>> Am I missing any configurations to be set for the
>> RemoteExecutionEnvironment for checkpointing to work.
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>>

>>>
>


Re: Checkpointing not happening in Standalone HA mode

2018-07-27 Thread vino yang
Hi Vinay,

Oh!  You use a collection source? That's the problem. Please use a general
source like Kafka or others. Maybe your checkpoint has not be triggered,
your job has stopped.

Thanks, vino.

2018-07-27 16:07 GMT+08:00 Vinay Patil :

> Hi Vino,
>
> Yes I am enabling checkpoint in the code as follows :
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment(",,getJobConfiguration(),jarPath");
>
>
> env.enableCheckpointing(1000);
>
> env.setSateBackend(new 
> FsStateBackend("file:///"));
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>
>
> In getJobConfiguration method I have set HA related properties like
> HA_STORAGE_PATH,HA_ZOOKEEPER_QUORUM,HA_ZOOKEEPER_ROOT,HA_
> MODE,HA_JOB_MANAGER_PORT_RANGE,HA_CLUSTER_ID
>
>
> I can see the error in Job Manager logs where it says Collection Source is
> not being executed at the moment. Aborting checkpoint. In the pipeline I
> have a stream initialized using "fromCollection". I think I will have to
> get rid of this.
>
> What do you suggest
>
> Regards,
> Vinay Patil
>
>
> On Thu, Jul 26, 2018 at 12:04 PM vino yang  wrote:
>
>> Hi Vinay:
>>
>> Did you call specific config API refer to this documentation[1];
>>
>> Can you share your job program and JM Log? Or the JM log contains the log
>> message like this pattern "Triggering checkpoint {} @ {} for job {}."?
>>
>> [1]: https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/stream/state/checkpointing.html#enabling-
>> and-configuring-checkpointing
>>
>> Thanks, vino.
>>
>> 2018-07-25 19:43 GMT+08:00 Chesnay Schepler :
>>
>>> Can you provide us with the job code?
>>>
>>> I assume that checkpointing runs properly if you submit the same job to
>>> a normal cluster?
>>>
>>>
>>> On 25.07.2018 13:15, Vinay Patil wrote:
>>>
>>> No error in the logs. That is why I am not able to understand why
>>> checkpoints are not getting triggered.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
>>> wrote:
>>>
 Hi Chesnay,

 No error in the logs. That is why I am not able to understand why
 checkpoints are getting triggered.

 Regards,
 Vinay Patil


 On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
 wrote:

> Please check the job- and taskmanager logs for anything suspicious.
>
> On 25.07.2018 12:33, Vinay Patil wrote:
>
> Hi,
>
> I am starting the cluster using bootstrap application where in I am
> calling Job Manager and Task Manager main class to form the cluster. The 
> HA
> cluster is formed correctly and I am able to submit jobs to this cluster
> using RemoteExecutionEnvironment but when I enable checkpointing in code I
> do not see any checkpoints triggered on Flink UI.
>
> Am I missing any configurations to be set for the
> RemoteExecutionEnvironment for checkpointing to work.
>
>
> Regards,
> Vinay Patil
>
>
>
>>>
>>


Re: Checkpointing not happening in Standalone HA mode

2018-07-27 Thread Vinay Patil
Hi Vino,

Yes I am enabling checkpoint in the code as follows :

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(",,getJobConfiguration(),jarPath");


env.enableCheckpointing(1000);

env.setSateBackend(new FsStateBackend("file:///"));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);


In getJobConfiguration method I have set HA related properties like
HA_STORAGE_PATH,HA_ZOOKEEPER_QUORUM,HA_ZOOKEEPER_ROOT,HA_MODE,HA_JOB_MANAGER_PORT_RANGE,HA_CLUSTER_ID


I can see the error in Job Manager logs where it says Collection Source is
not being executed at the moment. Aborting checkpoint. In the pipeline I
have a stream initialized using "fromCollection". I think I will have to
get rid of this.

What do you suggest

Regards,
Vinay Patil


On Thu, Jul 26, 2018 at 12:04 PM vino yang  wrote:

> Hi Vinay:
>
> Did you call specific config API refer to this documentation[1];
>
> Can you share your job program and JM Log? Or the JM log contains the log
> message like this pattern "Triggering checkpoint {} @ {} for job {}."?
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> Thanks, vino.
>
> 2018-07-25 19:43 GMT+08:00 Chesnay Schepler :
>
>> Can you provide us with the job code?
>>
>> I assume that checkpointing runs properly if you submit the same job to a
>> normal cluster?
>>
>>
>> On 25.07.2018 13:15, Vinay Patil wrote:
>>
>> No error in the logs. That is why I am not able to understand why
>> checkpoints are not getting triggered.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
>> wrote:
>>
>>> Hi Chesnay,
>>>
>>> No error in the logs. That is why I am not able to understand why
>>> checkpoints are getting triggered.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
>>> wrote:
>>>
 Please check the job- and taskmanager logs for anything suspicious.

 On 25.07.2018 12:33, Vinay Patil wrote:

 Hi,

 I am starting the cluster using bootstrap application where in I am
 calling Job Manager and Task Manager main class to form the cluster. The HA
 cluster is formed correctly and I am able to submit jobs to this cluster
 using RemoteExecutionEnvironment but when I enable checkpointing in code I
 do not see any checkpoints triggered on Flink UI.

 Am I missing any configurations to be set for the
 RemoteExecutionEnvironment for checkpointing to work.


 Regards,
 Vinay Patil



>>
>


Flink on kubernetes: taskmanager error

2018-07-27 Thread vipul singh
Hello,

I am trying to run flink on a kubernetes cluster using minikube and
kubectl. I am following this example
, which runs a flink 1.2
cluster ok.

I am interested in running flink 1.5.1, but when I modify the flink
version, I start to see these exceptions in taskmanager-controller logs.
The exceptions are below:

2018-07-27 07:34:22,429 INFO  org.apache.flink.core.fs.FileSystem
> - Hadoop is not in the classpath/dependencies. The
> extended set of supported File Systems via Hadoop is not available.
>
> 2018-07-27 07:34:22,442 INFO
> org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
> create Hadoop Security Module because Hadoop cannot be found in the
> Classpath.
>
> 2018-07-27 07:34:22,460 INFO  org.apache.flink.runtime.security.SecurityUtils
>   - Cannot install HadoopSecurityContext because Hadoop
> cannot be found in the Classpath.
>
> 2018-07-27 07:34:22,622 WARN  org.apache.flink.configuration.Configuration
> - Config uses deprecated configuration key
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
>
> 2018-07-27 07:34:22,626 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
> select the network interface and address to use by connecting to the
> leading JobManager.
>
> 2018-07-27 07:34:22,626 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils-
> TaskManager will try to connect for 1 milliseconds before falling back
> to heuristics
>
> 2018-07-27 07:34:22,629 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Retrieved new target address
> taskmanager-controller-vncdz/172.17.0.7:6123.
>
> 2018-07-27 07:34:23,390 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:23,391 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:23,391 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,392 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,392 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:23,393 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:23,393 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:24,195 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:24,196 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:24,197 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,198 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,198 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:24,199 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:24,199 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection
> refused (Connection refused)
>
> 2018-07-27 07:34:25,803 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address
> taskmanager-controller-vncdz/172.17.0.7:6123
>
> 2018-07-27 07:34:25,811 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'taskmanager-controller-vncdz/172.17.0.7': Connection refused (Connection
> refused)
>
> 2018-07-27 07:34:25,811 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/172.17.0.7':
> Connection refused (Connection refused)
>
> 2018-07-27 07:34:25,812 INFO  org.apach