'Custom' mapping function on keyed WindowedStream

2018-02-26 Thread Marchant, Hayden
I would like to create a custom aggregator function for a windowed KeyedStream 
which I have complete control over - i.e. instead of implementing an 
AggregatorFunction, I would like to control the lifecycle of the flink state by 
implementing the CheckpointedFunction interface, though I still want this state 
to be per-key, per-window. 

I am not sure which function I should be calling on the WindowedStream in order 
to invoke this custom functionality. I see from the documentation that 
CheckpointedFunction is for non-keyed state - which I guess eliminates this 
option.

A little background - I have logic that needs to hold a very large state in the 
operator - lots of counts by sub-key. Since only a sub-set of these 
aggregations are updated, I was interesting in trying out incremental 
checkpointing in RocksDB. However, I don't want to be hitting RocksDB I/O on 
every update of state since we need very low latency, and instead wanted to 
hold the state in Java Heap and then update the Flink state on checkpoint - i.e 
something like CheckpointedFunction.
My assumption is that any update I make to RocksDB backed state will hit the 
local disk - if this is wrong then I'll be happy

What other options do I have?

Thanks,
Hayden Marchant



RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Forget to mention that my target Kafka version is 0.11.x  with aim to upgrade 
to 1.0 when 1.0.x fixpack is released.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 8:05 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] 
Subject: RE: Kafka as source for batch job

Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon


On 8 February 2018 at 3:22:24 PM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden 
[ICG-IT] mailto:hm97...@imceu.eu.ssmb.com>>
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Hi Gordon,

Actually our use case is that we have start/end timestamp, and we plan on 
calling KafkaConsumer.offsetForTimes to get the offsets for each partition. So, 
I guess our logic is different in that we have an ‘and’ predicate between each 
partition arriving at offset, as opposed to the current ‘or’ predicate – i.e. 
any partition that fulfills a condition is enough to stop the job.

Either way, I’d still need to figure out when to stop the job.

Would it make more sense to implement an InputFormat that could wrap this 
‘bounded’ Kafka source, and use the DataSet / Batch Table API ?

Thanks
Hayden

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 8:05 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] 
Subject: RE: Kafka as source for batch job

Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon


On 8 February 2018 at 3:22:24 PM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden 
[ICG-IT] mailto:hm97...@imceu.eu.ssmb.com>>
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] 
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e.  from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y.  This is basically changes Kafka stream to 
be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished 
   Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source. 
   Pros - represent bounded data from Kafka topic as batch source, Cons  - 
requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.  
   Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant



RE: S3 for state backend in Flink 1.4.0

2018-02-07 Thread Marchant, Hayden
WE actually got it working. Essentially, it's an implementation of 
HadoopFilesytem, and was written with the idea that it can be used with Spark 
(since it has broader adoption than Flink as of now). We managed to get it 
configured, and found the latency to be much lower than by using the s3 
connector. There are a lot less copying operations etc... happening under the 
hood when using this native API which explains the better performance.

Happy to provide assistance offline if you're interested.

Thanks
Hayden

-Original Message-
From: Edward Rojas [mailto:edward.roja...@gmail.com] 
Sent: Thursday, February 01, 2018 6:09 PM
To: user@flink.apache.org
Subject: RE: S3 for state backend in Flink 1.4.0

Hi Hayden,

It seems like a good alternative. But I see it's intended to work with spark, 
did you manage to get it working with Flink ?

I some tests but I get several errors when trying to create a file, either for 
checkpointing or saving data. 

Thanks in advance,
Regards,
Edward



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=MW1NZ-mLVkooOHg-TWiOE7j2e9PCk7EOAmahXApcLtQ&s=b8kvNKIjylDuKlc2munyBj1da85y8aZ8brJsO24R2GU&e=


RE: Latest version of Kafka

2018-02-07 Thread Marchant, Hayden
Thanks for the info!

-Original Message-
From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
Sent: Friday, February 02, 2018 4:37 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org
Subject: Re: Latest version of Kafka

Hi,

Flink as for now provides only a connector for Kafka 0.11, which is using 
KafkaClient in 0.11.x version. However you should be able to use it for reading 
to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that 
Kafka 1.0 is backward compatible with 0.11. 

Piotrek

> On 1 Feb 2018, at 14:46, Marchant, Hayden  wrote:
> 
> What is the newest version of Kafka that is compatible with Flink 1.4.0? I 
> see the last version of Kafka supported is 0.11 , from documentation, but has 
> any testing been done with Kafka 1.0?
> 
> 
> Hayden Marchant
> 



RE: Joining data in Streaming

2018-02-07 Thread Marchant, Hayden
Thanks for all the ideas!!

From: Steven Wu [mailto:stevenz...@gmail.com]
Sent: Tuesday, February 06, 2018 3:46 AM
To: Stefan Richter 
Cc: Marchant, Hayden [ICG-IT] ; 
user@flink.apache.org; Aljoscha Krettek 
Subject: Re: Joining data in Streaming

There is also a discussion of side input
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D17-2BSide-2BInputs-2Bfor-2BDataStream-2BAPI&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=DINQCP5lFkUWfTJoHDmE5FW_lPp8zfNUGtYzkDvz9sY&s=sGwpvLXj8T7thBIZ1SsrpuohRFOkcl6bFcl9L49iRgM&e=>

I would load the smaller data set as static reference data set. Then you can 
just do single source streaming of the larger data set.

On Wed, Jan 31, 2018 at 1:09 AM, Stefan Richter 
mailto:s.rich...@data-artisans.com>> wrote:
Hi,

if the workarounds that Xingcan and me mentioned are no options for your 
use-case, then I think this might currently be the better option. But I would 
expect some better support for stream joins in the near future.

Best,
Stefan

> Am 31.01.2018 um 07:04 schrieb Marchant, Hayden 
> mailto:hayden.march...@citi.com>>:
>
> Stefan,
>
> So are we essentially saying that in this case, for now, I should stick to 
> DataSet / Batch Table API?
>
> Thanks,
> Hayden
>
> -Original Message-
> From: Stefan Richter 
> [mailto:s.rich...@data-artisans.com<mailto:s.rich...@data-artisans.com>]
> Sent: Tuesday, January 30, 2018 4:18 PM
> To: Marchant, Hayden [ICG-IT] 
> mailto:hm97...@imceu.eu.ssmb.com>>
> Cc: user@flink.apache.org<mailto:user@flink.apache.org>; Aljoscha Krettek 
> mailto:aljos...@apache.org>>
> Subject: Re: Joining data in Streaming
>
> Hi,
>
> as far as I know, this is not easily possible. What would be required is 
> something like a CoFlatmap function, where one input stream is blocking until 
> the second stream is fully consumed to build up the state to join against. 
> Maybe Aljoscha (in CC) can comment on future plans to support this.
>
> Best,
> Stefan
>
>> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden 
>> mailto:hayden.march...@citi.com>>:
>>
>> We have a use case where we have 2 data sets - One reasonable large data set 
>> (a few million entities), and a smaller set of data. We want to do a join 
>> between these data sets. We will be doing this join after both data sets are 
>> available.  In the world of batch processing, this is pretty straightforward 
>> - we'd load both data sets into an application and execute a join operator 
>> on them through a common key.   Is it possible to do such a join using the 
>> DataStream API? I would assume that I'd use the connect operator, though I'm 
>> not sure exactly how I should do the join - do I need one 'smaller' set to 
>> be completely loaded into state before I start flowing the large set? My 
>> concern is that if I read both data sets from streaming sources, since I 
>> can't be guaranteed of the order that the data is loaded, I may lose lots of 
>> potential joined entities since their pairs might not have been read yet.
>>
>>
>> Thanks,
>> Hayden Marchant
>>
>>
>



Latest version of Kafka

2018-02-01 Thread Marchant, Hayden
What is the newest version of Kafka that is compatible with Flink 1.4.0? I see 
the last version of Kafka supported is 0.11 , from documentation, but has any 
testing been done with Kafka 1.0?


Hayden Marchant



Reading bounded data from Kafka in Flink job

2018-02-01 Thread Marchant, Hayden
I have 2 datasets that I need to join together in a Flink batch job. One of the 
datasets needs to be created dynamically by completely 'draining' a Kafka topic 
in an offset range (start and end), and create a file containing all messages 
in that range. I know that in Flink streaming I can specify the start offset, 
but not the end offset. In my case, this preparation of the file from kafka 
topic is really working on a finite, bounded set of data, even though it's from 
Kafka. 

Is there a way that I can do this in Flink (either streaming or batch ?

Thanks,
Hayden





RE: S3 for state backend in Flink 1.4.0

2018-02-01 Thread Marchant, Hayden
Edward,

We are using Object Storage for checkpointing. I'd like to point out that we 
were seeing performance problems using the S3 protocol. Btw, we had quite a few 
problems using the flink-s3-fs-hadoop jar with Object Storage and had to do 
some ugly hacking to get it working all over. We recently discovered an 
alternative connector developed by IBM Research called stocator. It's a 
streaming writer and performs better than using the S3 protocol.

Here is a link to the library - https://github.com/SparkTC/stocator, and a blog 
explaining about it - 
http://www.spark.tc/stocator-the-fast-lane-connecting-object-stores-to-spark/

Good luck!!

-Original Message-
From: Edward Rojas [mailto:edward.roja...@gmail.com] 
Sent: Wednesday, January 31, 2018 3:02 PM
To: user@flink.apache.org
Subject: RE: S3 for state backend in Flink 1.4.0

Hi,

We are having a similar problem when trying to use Flink 1.4.0 with IBM Object 
Storage for reading and writing data. 

We followed
https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.4_ops_deployment_aws.html&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=gY41yFjnJzQNaL3R1YK7HzG8XUyBn0kJ6_3m-4t7E7k&e=
and the suggestion on 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D851&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=bDXNhnIV4KFTK9Byg5w2R_8UlWiXH05uAp9rkWJm_jo&e=.

We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder and 
we added the configuration on the flink-config.yaml:

s3.access-key: 
s3.secret-key: 
s3.endpoint: s3.us-south.objectstorage.softlayer.net 

With this we can read from IBM Object Storage without any problem when using 
env.readTextFile("s3://flink-test/flink-test.txt");

But we are having problems when trying to write. 
We are using a kafka consumer to read from the bus, we're making some 
processing and after saving  some data on Object Storage.

When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
The file is created but only when the job finish (or we stop it). But we need 
to save the data without stopping the job, so we are trying to use a Sink.

But when using a BucketingSink, we get the error: 
java.io.IOException: No FileSystem for scheme: s3 at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)


Do you have any idea how could we make it work using Sink?

Thanks,
Regards,

Edward



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=vN9sFldnlnzHZPgOBi42Rwfq1Hbq79gUPUNLgi0zmSM&e=


RE: Joining data in Streaming

2018-01-30 Thread Marchant, Hayden
Stefan,

So are we essentially saying that in this case, for now, I should stick to 
DataSet / Batch Table API?

Thanks,
Hayden

-Original Message-
From: Stefan Richter [mailto:s.rich...@data-artisans.com] 
Sent: Tuesday, January 30, 2018 4:18 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org; Aljoscha Krettek 
Subject: Re: Joining data in Streaming

Hi,

as far as I know, this is not easily possible. What would be required is 
something like a CoFlatmap function, where one input stream is blocking until 
the second stream is fully consumed to build up the state to join against. 
Maybe Aljoscha (in CC) can comment on future plans to support this.

Best,
Stefan

> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden :
> 
> We have a use case where we have 2 data sets - One reasonable large data set 
> (a few million entities), and a smaller set of data. We want to do a join 
> between these data sets. We will be doing this join after both data sets are 
> available.  In the world of batch processing, this is pretty straightforward 
> - we'd load both data sets into an application and execute a join operator on 
> them through a common key.   Is it possible to do such a join using the 
> DataStream API? I would assume that I'd use the connect operator, though I'm 
> not sure exactly how I should do the join - do I need one 'smaller' set to be 
> completely loaded into state before I start flowing the large set? My concern 
> is that if I read both data sets from streaming sources, since I can't be 
> guaranteed of the order that the data is loaded, I may lose lots of potential 
> joined entities since their pairs might not have been read yet. 
> 
> 
> Thanks,
> Hayden Marchant
> 
> 



Joining data in Streaming

2018-01-30 Thread Marchant, Hayden
We have a use case where we have 2 data sets - One reasonable large data set (a 
few million entities), and a smaller set of data. We want to do a join between 
these data sets. We will be doing this join after both data sets are available. 
 In the world of batch processing, this is pretty straightforward - we'd load 
both data sets into an application and execute a join operator on them through 
a common key.   Is it possible to do such a join using the DataStream API? I 
would assume that I'd use the connect operator, though I'm not sure exactly how 
I should do the join - do I need one 'smaller' set to be completely loaded into 
state before I start flowing the large set? My concern is that if I read both 
data sets from streaming sources, since I can't be guaranteed of the order that 
the data is loaded, I may lose lots of potential joined entities since their 
pairs might not have been read yet. 


Thanks,
Hayden Marchant




RE: S3 for state backend in Flink 1.4.0

2018-01-28 Thread Marchant, Hayden
I see that we can still use the other implementation, but were hoping that we'd 
benefit from the bug fix done in Flink 1.4.0 around 'repeated' load of 
configuration.  I'll check with the old implementation and see if it still 
works.

We also have seen discussions on a more native protocol that interfaces 
directly to IBM Object Storage that can be configured through the hdfs-site.xml 
called stocator that might speed things up. 

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Thursday, January 25, 2018 6:30 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org
Subject: Re: S3 for state backend in Flink 1.4.0

Hi,

Did you try overriding that config and it didn't work? That dependency is in 
fact still using the Hadoop S3 FS implementation but is shading everything to 
our own namespace so that there can't be any version conflicts. If that doesn't 
work then we need to look into this further.

The way you usually use this is by putting the flink-s3-fs-hadoop jar from the 
opt/ folder to the lib/ folder. I'm not sure including it as a dependency will 
work but it might. You also don't have to use flink-s3-fs-hadoop dependency if 
using the regular Hadoop S3 support worked for you before. It's only an 
additional option.

Best,
Aljoscha

> On 24. Jan 2018, at 16:33, Marchant, Hayden  wrote:
> 
> Hi,
> 
> We have a Flink Streaming application that uses S3 for storing checkpoints. 
> We are not using 'regular' S3, but rather IBM Object Storage which has an 
> S3-compatible connector. We had quite some challenges in overiding the 
> endpoint from the default s3.amnazonaws.com to our internal IBM Object 
> Storage endpoint. In 1.3.2, we managed to get this working by providing our 
> own jets3t.properties file that overrode s3service.s3-endpoint 
> (https://urldefense.proofpoint.com/v2/url?u=https-3A__jets3t.s3.amazonaws.com_toolkit_configuration.html&d=DwIFAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=pGMzFMafCab1RjHp3FDDKhlafEqeVPGytcX4PMbDk5Y&s=K2NJPrY_Mdv0u0B2CIvuckgr26dlraUJwZEU6aq5yXM&e=)
> 
> When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop 
> artifact. Seems that our overriding with jets3t.properties is no longer 
> relevant since does not use the Hadoop implementation anymore. 
> 
> Is there a way to overide this default endpoint, or with the presto endpoint 
> can we use this? Please note that if we provide the endpoint in the URL for 
> the state backend, it simply appends s3.amazonaws.com to the url. For example 
> s3://myobjectstorageendpoint.s3.amazonaws.com.
> 
> Are there any other solutions such as to 'rollback' to the Hadoop 
> implementation of S3?
> 
> Thanks,
> Hayden



S3 for state backend in Flink 1.4.0

2018-01-24 Thread Marchant, Hayden
Hi,

We have a Flink Streaming application that uses S3 for storing checkpoints. We 
are not using 'regular' S3, but rather IBM Object Storage which has an 
S3-compatible connector. We had quite some challenges in overiding the endpoint 
from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. 
In 1.3.2, we managed to get this working by providing our own jets3t.properties 
file that overrode s3service.s3-endpoint 
(https://jets3t.s3.amazonaws.com/toolkit/configuration.html)

When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop 
artifact. Seems that our overriding with jets3t.properties is no longer 
relevant since does not use the Hadoop implementation anymore. 

Is there a way to overide this default endpoint, or with the presto endpoint 
can we use this? Please note that if we provide the endpoint in the URL for the 
state backend, it simply appends s3.amazonaws.com to the url. For example 
s3://myobjectstorageendpoint.s3.amazonaws.com.

Are there any other solutions such as to 'rollback' to the Hadoop 
implementation of S3?

Thanks,
Hayden


Hardware Reference Architecture

2017-12-07 Thread Marchant, Hayden
Hi,

I'm looking for guidelines for Reference architecture for Hardware for a 
small/medium Flink cluster - we'll be installing on in-house bare-metal 
servers. I'm looking for guidance for:

1. Number and spec of  CPUs
2. RAM
3. Disks
4. Network
5. Proximity of servers to each other

(Most likely, we will choose YARN as a cluster manager for Flink)

If someone can share a document or link with relevant information, I will be 
very grateful.

Thanks,
Hayden Marchant



TaskManager HA on YARN

2017-12-04 Thread Marchant, Hayden
Hi,

WE are currently start to test Flink running on YARN. Till now, we've been 
testing on Standalone Cluster. One thing lacking in standalone is that we have 
to manually restart a Task Manager if it dies. I looked at 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#yarn-cluster-high-availability
 , and see that YARN deals with HA for Job Manager. How does it deal with a 
Task Manager if it dies? I would like the Task Manager to be dealt with 
similarly to Job Manager on failure. For example, let's say I have a cluster 
with two Task Managers, and one task manager dies. Will YARN restart the dead 
Task Manager, or would that need to be a manual restart?

What actually would happen in the above case?

Thanks,
Hayden




Garbage collection concerns with Task Manager memory

2017-10-18 Thread Marchant, Hayden
I read in the Flink documentation that the TaskManager runs all tasks within 
its own JVM, and that the recommendation is to set the taskmanager.heap.mb to 
be as much as is available on the server. I have a very large server with 192GB 
so thinking of giving most of it to the Task Manager.

I recall that there are concerns with long stop-the-world garbage collection 
pauses about allocating too much memory to a JVM - is this still a concern with 
G1 ?


Thanks,
Hayden Marchant





start-cluster.sh not working in HA mode

2017-10-16 Thread Marchant, Hayden
I am attempting to run Flink 1.3.2 in HA mode with zookeeper.

When I run the start-cluster.sh, the job manager is not started, even though 
the task manager is started. When I delved into this, I saw that the  command:

ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${master} ${webuiport} &"

is not actually running anything on the host. i.e. I do not see "Starting 
jobmanager daemon on host ."

Only when I remove ALL quotes, do I see it working. i.e. if I run:

ssh -n $FLINK_SSH_OPTS $master -- nohup /bin/bash -l 
${FLINK_BIN_DIR}/jobmanager.sh start cluster ${master} ${webuiport} &

I see that it manages to run the job manager - I see " Starting jobmanager 
daemon on host.".

Did anyone else experience a similar problem? Any elegant workarounds without 
having to change source code?

Thanks,
Hayden Marchant



RE: In-memory cache

2017-10-02 Thread Marchant, Hayden
Nice idea. Actually we are looking at connect for other parts of our solution 
in which the latency is less critical.

A few considerations of not using ‘connect’ in this case were:


1.   To isolate the two streams from each other to reduce complexity, 
simplify debugging etc…. – since we are newbies at Flink I was thinking that it 
is beneficial to keep the stream as simple as possible, and if need be, we can 
interface between them to ‘exchange data’

2.   The reference data, even though quite small, is updated every 100ms. 
Since we would need this reference data on each ‘consuming’ operator instance, 
we would be essentially nearly double the amount of tuples coming through the 
operator. Since low-latency is  key here, this was a concern, the assumption 
being that the two sides of the ‘connect’ share the same resources – whereas 
using a background thread to update a ‘map’ would not be competing with the 
incoming tuples)

I realize that structurally, connect is a neater solution.

If I can be convinced that my above concerns are unfounded, I’ll be happy to 
try that direction.

Thanks
Hayden

From: Stavros Kontopoulos [mailto:st.kontopou...@gmail.com]
Sent: Monday, October 02, 2017 2:24 PM
To: Marchant, Hayden [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: In-memory cache

How about connecting two streams of data, one from the reference data and one 
from the main data (I assume using key streams as you mention QueryableState) 
and keep state locally within the operator.
The idea is to have a local sub-copy of the reference data within the operator 
that is updated from the source of the reference data. Reference data are still 
updated
externally from that low latency flink app. Here is a relevant question: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-state-in-connected-streams-td8727.html<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Accessing-2Dstate-2Din-2Dconnected-2Dstreams-2Dtd8727.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=eLl5xx2Dc8nmmad2mz2k0aQ53NeI_Fb2V-qeRn-7CVQ&s=05n6tNvhZgLQ4o_N9tpkh8jhM1RcyCB_MIVcZILECtI&e=>.
 Would that help?

Stavros



On Mon, Oct 2, 2017 at 1:46 PM, Marchant, Hayden 
mailto:hayden.march...@citi.com>> wrote:
We have an operator in our streaming application that needs to access 
'reference data' that is updated by another Flink streaming application. This 
reference data has about ~10,000 entries and has a small footprint. This 
reference data needs to be updated ~ every 100 ms. The required latency for  
this application is extremely low ( a couple of milliseconds), and we are 
therefore cautious of paying cost of I/O to access the reference data remotely. 
We are currently examining 3 different options for accessing this reference 
data:

1. Expose the reference data as QueryableState and access it directly from the 
'client' streaming operator using the QueryableState API
2. same as #1, but create an In-memory Java cache of the reference data within 
the operator that is asynchronously updated at a scheduled frequency using the 
QueryableState API
3. Output the reference data to Redis, and create an in-memory java cache of 
the reference data within the operator that is asynchronously updated at a 
scheduled frequency using Redis API.

My understanding is that one of the cons of using Queryable state, is that if 
the Flink application that generates the reference data is unavailable, the 
Queryable state will not exist - is that correct?

If we were to use an asynchronously scheduled 'read' from the distributed 
cache, where should it be done? I was thinking of using 
ScheduledExecutorService from within the open method of the Flink operator.

What is the best way to get this done?

Regards,
Hayden Marchant



In-memory cache

2017-10-02 Thread Marchant, Hayden
We have an operator in our streaming application that needs to access 
'reference data' that is updated by another Flink streaming application. This 
reference data has about ~10,000 entries and has a small footprint. This 
reference data needs to be updated ~ every 100 ms. The required latency for  
this application is extremely low ( a couple of milliseconds), and we are 
therefore cautious of paying cost of I/O to access the reference data remotely. 
We are currently examining 3 different options for accessing this reference 
data:

1. Expose the reference data as QueryableState and access it directly from the 
'client' streaming operator using the QueryableState API
2. same as #1, but create an In-memory Java cache of the reference data within 
the operator that is asynchronously updated at a scheduled frequency using the 
QueryableState API
3. Output the reference data to Redis, and create an in-memory java cache of 
the reference data within the operator that is asynchronously updated at a 
scheduled frequency using Redis API. 

My understanding is that one of the cons of using Queryable state, is that if 
the Flink application that generates the reference data is unavailable, the 
Queryable state will not exist - is that correct?

If we were to use an asynchronously scheduled 'read' from the distributed 
cache, where should it be done? I was thinking of using 
ScheduledExecutorService from within the open method of the Flink operator.

What is the best way to get this done?

Regards,
Hayden Marchant



Testing recoverable job state

2017-09-13 Thread Marchant, Hayden
I'm a newbie to Flink and am trying to understand how the recovery works using 
state backends. I've read the documentation and am now trying to run a simple 
test to demonstrate the abilities - I'd like to test the recovery of a flink 
job and how the state is recovered from where it left off when 'disaster hit'. 

Please note that this whole test is being done on a Windows workstation through 
my IDE. I am running a LocalFlinkMiniCluster and have enabled checkpointing 
using FsStateBackend. I am using Kafka as a source. When running this Flink 
job, I see that a new directory is created within the FsStateBackend base 
directory with a randomly generated JobID. I assume that if a Task fails within 
the job, the state stored in the backend will be used to restart the relevant 
Operator instances from the recent checkpoint. I have tried simulating this by 
throwing an exception in one of the operators,  though I'm not sure what the 
expected functionality is now - will the Task be killed, or just that 'bad' 
tuple will be ignored?

Also, and more importantly, I would like to simulate a more 'drastic' failure - 
that of my whole Flink cluster going down. In my test I would do this simply by 
killing my single LocalFlinkMiniCluster process.  In that case, I would like my 
job to resume when I restart the Flink cluster. However, when I do that, my 
could launches a new job, with same code, but running with a new Job ID. How do 
I get it to run with the same Job ID so that it can use the stored state to 
recover?

Am I approaching this test in the right way? If not, please give me some 
pointers to better simulate a real system. (Note that in a real system, we 
would like to run on a single node cluster.)

Thanks,
Hayden Marchant




RE: Queryable State

2017-09-13 Thread Marchant, Hayden
I can see the job running in the FlinkUI for the job, and specifically 
specified the port for the Job Manager. When I provided a different port, I got 
an akka exception. Here, it seems that the code is getting further. I think 
that it might be connected with how I am creating the StateDescriptor. What 
exactly does it mean when the KvStateLocation can't be found?

-Original Message-
From: Biplob Biswas [mailto:revolutioni...@gmail.com] 
Sent: Wednesday, September 13, 2017 2:20 PM
To: user@flink.apache.org
Subject: Re: Queryable State

Hi, 


are you sure your jobmanager is running and is accessible from the supplied 
hostname and port? If you can start up the FLink UI of the job which creates 
your queryable state, it should have the details of the job manager and the 
port to be used in this queryable client job.



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=ox9rY5RgZleCKLmUaw2y4BpSeUaf32AN7o4HRP1gkUQ&s=gZtSvvulOpw2jMACIgulbIacj6bKfndY6B7LdP-jRbg&e=
 


QueryableState - No KvStateLocation found for KvState instance

2017-09-13 Thread Marchant, Hayden
I am trying to use queryable state, and am encountering issues when querying 
the state from the client. I get the following exception:

Exception in thread "main" 
org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found 
for KvState instance with name 'word_sums'.
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 Exception in thread "main" 
org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found 
for KvState instance with name 'word_sums'.
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

In my flow, I am creating the queryable state in the following way:

final TypeSerializer> valueSerializer =
TypeInformation.of(new TypeHint>() 
{}).createSerializer(new ExecutionConfig());
ValueStateDescriptor> vsd = new 
ValueStateDescriptor<>(WORD_SUMS_STATE,valueSerializer);
QueryableStateStream> 
tupleTuple2QueryableStateStream = 
wordsSummedStream.asQueryableState(WORD_SUMS_STATE, vsd);

I am using LocalFlinkMiniCluster and have enabled 
QueryableStateOptions.SERVER_ENABLE in the configuration. From the logs in the 
startup of the flow, I see that the queryable state operator is running. I also 
see the queryable state operation from the web console 

Is there anything else that I am missing?

Thanks,
Hayden Marchant




Shuffling between map and keyBy operator

2017-09-05 Thread Marchant, Hayden
I have a streaming application that has a keyBy operator followed by an 
operator working on the keyed values (a custom sum operator). If the map 
operator and aggregate operator are running on same Task Manager , will Flink 
always serialize and deserialize the tuples, or is there an optimization in 
this case due to 'locality'? 

(I was planning on deploying my Flink Streaming application to a single 'big' 
node in the hope that I can reduce latency by saving on both network and serde.)


Thanks,
Hayden Marchant




Very low-latency - is it possible?

2017-08-31 Thread Marchant, Hayden
We're about to get started on a 9-person-month PoC using Flink Streaming. 
Before we get started, I am interested to know how low-latency I can expect for 
my end-to-end flow for a single event (from source to sink). 

Here is a very high-level description of our Flink design: 
We need at least once semantics, and our main flow of application is parsing a 
message ( < 50 microseconds) from Kafka, and then doing a keyBy on the parsed 
event ( <1kb) and then updating a very small user state in the KeyedStream, and 
then doing another keyBy and then operator of that KeyedStream. Each of the 
operators is a very simple operation - very little calculation and no I/O.


** Our requirement is to get close to 1ms (99%) or lower for end-to-end 
processing (timer starts once we get message from Kafka). Is this at all 
realistic if are flow contains 2 aggregations?  If so, what optimizations might 
we need to get there regarding cluster configuration (both Flink and Hardware). 
Our throughput is possibly small enough (40,000 events per second) that we 
could run on one node - which might eliminate some network latency. 

I did read in 
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
 in Exactly Once vs At Least Once that a few milliseconds is considered super 
low-latency - wondering if we can get lower.

Any advice or 'war stories' are very welcome.

Thanks,
Hayden Marchant




RE: Using local FS for checkpoint

2017-08-31 Thread Marchant, Hayden
I didn’t think about NFS. That would save me the hassle of installing HDFS 
cluster just for that, especially if my organization already has an NFS ‘handy’.

Thanks
Hayden

From: Tony Wei [mailto:tony19920...@gmail.com]
Sent: Thursday, August 31, 2017 12:12 PM
To: Marchant, Hayden [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Using local FS for checkpoint

Hi Marchant,

HDFS is not a must for storing checkpoints. S3 or NFS are all acceptable, as 
long as it is accessible from job manager and task manager.
For AWS S3 configuration, you can refer to this page 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.3_setup_aws.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=FqFmOvFkeIVmEku6VDuGfbYpEZLcbN7UUGFUei8TrgA&s=Em489MhXME4sEtU03lj8groEt92mJWMIeAvcx7Hi7is&e=>).

Best,
Tony Wei

2017-08-31 15:53 GMT+08:00 Marchant, Hayden 
mailto:hayden.march...@citi.com>>:
Whether I use RocksDB or FS State backends, if my requirements are to have 
fault-tolerance and ability to recover with 'at-least once' semantics for my 
Flink job, is there still a valid case for using a backing local FS for storing 
states? i.e. If a Flink Node is invalidated, I would have thought that the only 
way it could recover (by re-starting the task on different node), is if the 
state is stored in  a shared file system such as HDFS, S3 etc

I am asking since I want to know if HDFS is a must have for my deployment.

Thanks,

Hayden




Using local FS for checkpoint

2017-08-31 Thread Marchant, Hayden
Whether I use RocksDB or FS State backends, if my requirements are to have 
fault-tolerance and ability to recover with 'at-least once' semantics for my 
Flink job, is there still a valid case for using a backing local FS for storing 
states? i.e. If a Flink Node is invalidated, I would have thought that the only 
way it could recover (by re-starting the task on different node), is if the 
state is stored in  a shared file system such as HDFS, S3 etc

I am asking since I want to know if HDFS is a must have for my deployment.

Thanks,

Hayden