Weird error in submitting a flink job to yarn cluster

2017-10-02 Thread vipul singh
Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to
parquet format. I am having some weird issues in deploying this application
to a yarn cluster. I am not 100% sure this falls into a flink related
error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok.
This is the command I use for the deployment:

*Command:* *flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2
-yn 2 -d -c  jar_name.jar*

However as soon as I try to submit a similar job to a already running yarn
cluster, I start to get these
errors(*https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57
*) and
application crashes. I checked the location in /tmp, where I am creating
the file, and there is no file existing there.

*Command:* *flink run -yid application_id -d -c  jar_name.jar *


A bit more about my algorithm, I use a temp array to buffer messages in the
@invoke method, and when specific threshold are reached I create a parquet
file with this buffered data. Once a tmp parquet file is created, I upload
this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
   .withSchema(schema.get)
   .withCompressionCodec(compressionCodecName)
   .withRowGroupSize(blockSize)
   .withPageSize(pageSize)
   .build())
bufferedMessages.foreach { e =>
  writer.get.write(e.payload)
}
writer.get.close()


Please do let me know.

Thanking in advance,
- Vipul


Re: kafka consumer parallelism

2017-10-02 Thread Timo Walther

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka 
partition to process multiple documents at the same time. Make also sure 
to send the documents to different partitions.


Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:

Hello
I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
What's the explanation of this behavior?
According to sources:

To enable parallel execution, the user defined source should
      * implement {@link 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or 
extend {@link
      * 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}
which FlinkKafkaConsumer010 does

Please check a screenshot at https://imgur.com/a/E1H9r  you'll see that only 
one sends 3 records to the sinks

My code is here: https://pastebin.com/yjYCXAAR

Thanks!





Re: Session Window set max timeout

2017-10-02 Thread Timo Walther

Hi,

I would recommend to implement your custom trigger in this case. You can 
override the default trigger of your window:


https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#triggers

This is the interface where you can control the triggering:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

I hope this helps.

Regards,
Timo


Am 10/2/17 um 4:42 PM schrieb ant burton:

Is it possible to limit session windowing to a max of n seconds/hours etc?

i.e. I would like a session window, but if a session runs for an 
unacceptable amount of time, I would like to close it.


Thanks,





Re: Flink Watermark and timing

2017-10-02 Thread Timo Walther

Hi Björn,


I don't know if I get your example correctly, but I think your 
explanation "All events up to and equal to watermark should be handled 
in the prevoius window" is not 100% correct. Watermarks just indicate 
the progress ("until here we have seen all events with lower timestamp 
than X") and trigger the evaluation of a window. The assignment of 
events to windows is based on the timestamp not the watermark. The 
documentation will be improved for the upcoming release:


https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#window-assigners

"Time-based windows have a start timestamp (inclusive) and an end 
timestamp (exclusive) that together describe the size of the window. "


I hope this helps.

Regards,
Timo


Am 10/2/17 um 1:06 PM schrieb Björn Zachrisson:

Hi,

I have a question regarding timing of events.

According to;
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#event-time-and-watermarks

All events up to and equal to watermark should be handled in "the 
prevoius window".


In my case I use event-timestamp.


I'm testing the timing out.

The case is events from 2000-01-01 02:00:00 and up to 2000-01-01 
02:20:00 where eavh event is 2 minutes apart. I try to group the 
events in 5 minute windows


2000-01-01 02:00:00 => 2000-01-01 02:05:00
2000-01-01 02:05:00 => 2000-01-01 02:10:00
2000-01-01 02:10:00 => 2000-01-01 02:15:00
2000-01-01 02:15:00 => 2000-01-01 02:20:00

How ever, events at the exakt time 02:10:00 (94669260) is put in 
the Window "2000-01-01 02:10:00 => 2000-01-01 02:15:00" which is not 
according to what i can read on the wiki.


This is the exakt result;
2000-01-01 02:00:00, 94669200
2000-01-01 02:02:00, 94669212
2000-01-01 02:04:00, 94669224

2000-01-01 02:06:00, 94669236
2000-01-01 02:08:00, 94669248

2000-01-01 02:10:00, 94669260
2000-01-01 02:12:00, 94669272
2000-01-01 02:14:00, 94669284

2000-01-01 02:16:00, 94669296
2000-01-01 02:18:00, 94669308

2000-01-01 02:20:00, 94669320

Is this due to that I'm using event time extractor or what might be 
the case?


Regards
Björn






Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Piotr Nowojski
We are planning to work on this clean shut down after releasing Flink 1.4. 
Implementing this properly would require some work, for example:
- adding some checkpoint options to add information about “closing”/“shutting 
down” event
- add clean shutdown to source functions API
- implement handling of this clean shutdown in desired sources

Those are not super complicated changes but also not trivial.

One thing that you could do, is to implement some super hacky filter function 
just after source operator, that you would manually trigger. Normally it would 
pass all of the messages. Once triggered, it would wait for next checkpoint to 
happen. It would assume that it is a save point, and would start filtering out 
all of the subsequent messages. When this checkpoint completes, you could 
manually shutdown your Flink application. This could guarantee that there are 
no duplicated writes after a restart. This might work for clean shutdown, but 
it would be a very hacky solution. 

Btw, keep in mind that even with clean shutdown you can end up with duplicated 
messages after a crash and there is no way around this with Kafka 0.9.

Piotrek

> On Oct 2, 2017, at 5:30 PM, Antoine Philippot  
> wrote:
> 
> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and 
> until a while).
> 
> We can not afford tens of thousands of duplicated messages for each 
> application upgrade, can I help by working on this feature ?
> Do you have any hint or details on this part of that "todo list" ? 
>  
> 
> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski  > a écrit :
> Hi,
> 
> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will 
> be possible to achieve exactly-once end to end semantic when writing to 
> Kafka. However this still a work in progress:
> 
> https://issues.apache.org/jira/browse/FLINK-6988 
> 
> 
> However this is a superset of functionality that you are asking for. 
> Exactly-once just for clean shutdowns is also on our “TODO” list (it 
> would/could support Kafka 0.9), but it is not currently being actively 
> developed.
> 
> Piotr Nowojski
> 
>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot > > wrote:
>> 
>> Hi,
>> 
>> I'm working on a flink streaming app with a kafka09 to kafka09 use case 
>> which handles around 100k messages per seconds.
>> 
>> To upgrade our application we used to run a flink cancel with savepoint 
>> command followed by a flink run with the previous saved savepoint and the 
>> new application fat jar as parameter. We notice that we can have more than 
>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>> 
>> This behaviour is actually problematic for this project and I try to find a 
>> solution / workaround to avoid these duplicated messages.
>> 
>> The JobManager indicates clearly that the cancel call is triggered once the 
>> savepoint is finished, but during the savepoint execution, kafka source 
>> continue to poll new messages which will not be part of the savepoint and 
>> will be replayed on the next application start.
>> 
>> I try to find a solution with the stop command line argument but the kafka 
>> source doesn't implement StoppableFunction 
>> (https://issues.apache.org/jira/browse/FLINK-3404 
>> ) and the savepoint 
>> generation is not available with stop in contrary to cancel.
>> 
>> Is there an other solution to not process duplicated messages for each 
>> application upgrade or rescaling ?
>> 
>> If no, has someone planned to implement it? Otherwise, I can propose a pull 
>> request after some architecture advices.
>> 
>> The final goal is to stop polling source and trigger a savepoint once 
>> polling stopped.
>> 
>> Thanks
> 



kafka consumer parallelism

2017-10-02 Thread r. r.
Hello
I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
What's the explanation of this behavior? 
According to sources:

To enable parallel execution, the user defined source should
     * implement {@link 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or 
extend {@link
     * 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}
which FlinkKafkaConsumer010 does

Please check a screenshot at https://imgur.com/a/E1H9r  you'll see that only 
one sends 3 records to the sinks

My code is here: https://pastebin.com/yjYCXAAR

Thanks!



Re: Enriching data from external source with cache

2017-10-02 Thread Derek VerLee

  
  
Thanks Timo, watching the video now.
I did try out the method with iteration in a simple prototype and
  it works.  But you are right, combining it with the other
  requirements into a single process function has so far resulted in
  more complexity than I'd like, and it's always nice to leave
  something easily understood later.  

On the contribution, I was wondering there was some scary with async
and keyed state going on that prevented this from having happened
already.  I'll have a look and see if I can find where the current
non keyed implementation logic resides in the project.

Thanks

On 10/2/17 6:07 AM, Timo Walther wrote:

Hi
  Derek,
  
  
  maybe the following talk can inspire you, how to do this with
  joins and async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM
  (around the 17th min). Basically, you split the stream and wait
  for an Async IO result in a downstream operator.
  
  
  But I think having a transient guava cache is not a bad idea,
  since it is only a cache it does not need to be checkpointed and
  can be recovered at any time.
  
  
  Implementing you own logic in a ProcessFunction is always a way,
  but might require more implementation effort.
  
  
  Btw. if you feel brave enough, you could also think of
  contributing a stateful async IO. It should not be too much effort
  to make this work.
  
  
  Regards,
  
  Timo
  
  
  
  
  Am 9/29/17 um 8:39 PM schrieb Derek VerLee:
  
  My basic problem will sound familiar I
think, I need to enrich incoming data using a REST call to an
external system for slowly evolving metadata. and some cache
based lag is acceptable, so to reduce load on the external
system and to process more efficiently, I would like to
implement a cache.  The cache would by key, and I am already
doing a keyBy for the same key in the job.


Please correct me if I'm wrong:

* Keyed State would be great to store my metadata "cache", Async
I/O is ideal for pulling from the external system,

but AsyncFunction can not access keyed state ( "Exception: State
is not supported in rich async functions.") and operators can
not share state between them.


This leaves me wondering, since side inputs are not here yet,
what the best (and perhaps most idiomatic) way to approach my
problem?


I'd rather keep changes to existing systems minimal for this
iteration and just minimize impact on them during peaks best I
can... systemic refactoring and re-architecture will be coming
soon (so I'm happy to hear thoughts on that as well).


Approaches considered:


1. AsyncFunction with a transient guava cache.  Not ideal ...
but maybe good enough to get by

2. Using compound message types (oh, if only java had real
algebraic data types...) and send cache miss messages from some
CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not
keyed) which then backfeeds cache updates to the former via
iteration ... i don't know why this couldn't work but it feels
like a hot mess unless there is some way I am not thinking of to
do it cleanly

3. One user mentioned on a similar thread loading the data in as
another DataStream and then using joins, but I'm confused about
how this would work, it seems to me that joins happen on
windows, windows pertain to (some notion of) time, what would be
my notion of time for the slow (maybe years old in some cases)
meta-data?

4. Forget about async I/O

5. implement my own "async i/o" in using a process function or
similar  .. is this a valid pattern

  
  
  


  



Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Antoine Philippot
Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and
until a while).

We can not afford tens of thousands of duplicated messages for each
application upgrade, can I help by working on this feature ?
Do you have any hint or details on this part of that "todo list" ?


Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski  a
écrit :

> Hi,
>
> For failures recovery with Kafka 0.9 it is not possible to avoid
> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka
> 0.11 it will be possible to achieve exactly-once end to end semantic when
> writing to Kafka. However this still a work in progress:
>
> https://issues.apache.org/jira/browse/FLINK-6988
>
> However this is a superset of functionality that you are asking for.
> Exactly-once just for clean shutdowns is also on our “TODO” list (it
> would/could support Kafka 0.9), but it is not currently being actively
> developed.
>
> Piotr Nowojski
>
> On Oct 2, 2017, at 3:35 PM, Antoine Philippot 
> wrote:
>
> Hi,
>
> I'm working on a flink streaming app with a kafka09 to kafka09 use case
> which handles around 100k messages per seconds.
>
> To upgrade our application we used to run a flink cancel with savepoint
> command followed by a flink run with the previous saved savepoint and the
> new application fat jar as parameter. We notice that we can have more than
> 50k of duplicated messages in the kafka sink wich is not idempotent.
>
> This behaviour is actually problematic for this project and I try to find
> a solution / workaround to avoid these duplicated messages.
>
> The JobManager indicates clearly that the cancel call is triggered once
> the savepoint is finished, but during the savepoint execution, kafka source
> continue to poll new messages which will not be part of the savepoint and
> will be replayed on the next application start.
>
> I try to find a solution with the stop command line argument but the kafka
> source doesn't implement StoppableFunction (
> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
> generation is not available with stop in contrary to cancel.
>
> Is there an other solution to not process duplicated messages for each
> application upgrade or rescaling ?
>
> If no, has someone planned to implement it? Otherwise, I can propose a
> pull request after some architecture advices.
>
> The final goal is to stop polling source and trigger a savepoint once
> polling stopped.
>
> Thanks
>
>
>


Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Piotr Nowojski
Hi,

For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be 
possible to achieve exactly-once end to end semantic when writing to Kafka. 
However this still a work in progress:

https://issues.apache.org/jira/browse/FLINK-6988 


However this is a superset of functionality that you are asking for. 
Exactly-once just for clean shutdowns is also on our “TODO” list (it 
would/could support Kafka 0.9), but it is not currently being actively 
developed.

Piotr Nowojski

> On Oct 2, 2017, at 3:35 PM, Antoine Philippot  
> wrote:
> 
> Hi,
> 
> I'm working on a flink streaming app with a kafka09 to kafka09 use case which 
> handles around 100k messages per seconds.
> 
> To upgrade our application we used to run a flink cancel with savepoint 
> command followed by a flink run with the previous saved savepoint and the new 
> application fat jar as parameter. We notice that we can have more than 50k of 
> duplicated messages in the kafka sink wich is not idempotent.
> 
> This behaviour is actually problematic for this project and I try to find a 
> solution / workaround to avoid these duplicated messages.
> 
> The JobManager indicates clearly that the cancel call is triggered once the 
> savepoint is finished, but during the savepoint execution, kafka source 
> continue to poll new messages which will not be part of the savepoint and 
> will be replayed on the next application start.
> 
> I try to find a solution with the stop command line argument but the kafka 
> source doesn't implement StoppableFunction 
> (https://issues.apache.org/jira/browse/FLINK-3404 
> ) and the savepoint 
> generation is not available with stop in contrary to cancel.
> 
> Is there an other solution to not process duplicated messages for each 
> application upgrade or rescaling ?
> 
> If no, has someone planned to implement it? Otherwise, I can propose a pull 
> request after some architecture advices.
> 
> The final goal is to stop polling source and trigger a savepoint once polling 
> stopped.
> 
> Thanks



Session Window set max timeout

2017-10-02 Thread ant burton
Is it possible to limit session windowing to a max of n seconds/hours etc?

i.e. I would like a session window, but if a session runs for an
unacceptable amount of time, I would like to close it.

Thanks,


At end of complex parallel flow, how to force end step with parallel=1?

2017-10-02 Thread Garrett Barton
I have a complex alg implemented using the DataSet api and by default it
runs with parallel 90 for good performance. At the end I want to perform a
clustering of the resulting data and to do that correctly I need to pass
all the data through a single thread/process.

I read in the docs that as long as I did a global reduce using
DataSet.reduceGroup(new GroupReduceFunction) that it would force it to
a single thread.  Yet when I run the flow and bring it up in the ui, I see
parallel 90 all the way through the dag including this one.

Is there a config or feature to force the flow back to a single thread?  Or
should I just split this into two completely separate jobs?  I'd rather not
split as I would like to use flinks ability to iterate on this alg and
cluster combo.

Thank you


Savepoints - jobmanager.rpc.address

2017-10-02 Thread ant burton
Hi,

When taking a savepoint on AWS EMR I get the following error

[hadoop@ip-10-12-169-172 ~]$ flink savepoint
e14a6402b6f1e547c4adf40f43861c27
Retrieving JobManager.


 The program finished with the following exception:

org.apache.flink.configuration.IllegalConfigurationException: Couldn't
retrieve client for cluster
at
org.apache.flink.client.CliFrontend.retrieveClient(CliFrontend.java:925)
at
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:939)
at
org.apache.flink.client.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at
org.apache.flink.client.CliFrontend.savepoint(CliFrontend.java:704)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1096)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
*Caused by: java.lang.RuntimeException: Couldn't retrieve standalone
cluster*
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:48)
at
org.apache.flink.client.cli.DefaultCLI.retrieveCluster(DefaultCLI.java:74)
at
org.apache.flink.client.cli.DefaultCLI.retrieveCluster(DefaultCLI.java:38)
at
org.apache.flink.client.CliFrontend.retrieveClient(CliFrontend.java:920)
... 12 more
*Caused by: org.apache.flink.util.ConfigurationException: Config parameter
'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is
missing (hostname/address of JobManager to connect to).*
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:119)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:76)
at
org.apache.flink.client.program.ClusterClient.(ClusterClient.java:131)
at
org.apache.flink.client.program.StandaloneClusterClient.(StandaloneClusterClient.java:42)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:46)
... 15 more

My configuration.json is

[
{
"Classification": "flink-conf",
"Properties": {
"taskmanager.numberOfTaskSlots":"1",
"state.backend": "filesystem",
"state.checkpoints.dir": "s3://flink/checkpoints/",
"state.backend.fs.checkpointdir": "s3://flink/checkpoints/"
}
}
]


Setting the following in configuration.json does not resolve the issue.

jobmanager.rpc.address: localhost or 0.0.0.0 or 127.0.0.1
jobmanager.rpc.port: 6123


Thanks,


Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Antoine Philippot
Hi,

I'm working on a flink streaming app with a kafka09 to kafka09 use case
which handles around 100k messages per seconds.

To upgrade our application we used to run a flink cancel with savepoint
command followed by a flink run with the previous saved savepoint and the
new application fat jar as parameter. We notice that we can have more than
50k of duplicated messages in the kafka sink wich is not idempotent.

This behaviour is actually problematic for this project and I try to find a
solution / workaround to avoid these duplicated messages.

The JobManager indicates clearly that the cancel call is triggered once the
savepoint is finished, but during the savepoint execution, kafka source
continue to poll new messages which will not be part of the savepoint and
will be replayed on the next application start.

I try to find a solution with the stop command line argument but the kafka
source doesn't implement StoppableFunction (
https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
generation is not available with stop in contrary to cancel.

Is there an other solution to not process duplicated messages for each
application upgrade or rescaling ?

If no, has someone planned to implement it? Otherwise, I can propose a pull
request after some architecture advices.

The final goal is to stop polling source and trigger a savepoint once
polling stopped.

Thanks


RE: In-memory cache

2017-10-02 Thread Marchant, Hayden
Nice idea. Actually we are looking at connect for other parts of our solution 
in which the latency is less critical.

A few considerations of not using ‘connect’ in this case were:


1.   To isolate the two streams from each other to reduce complexity, 
simplify debugging etc…. – since we are newbies at Flink I was thinking that it 
is beneficial to keep the stream as simple as possible, and if need be, we can 
interface between them to ‘exchange data’

2.   The reference data, even though quite small, is updated every 100ms. 
Since we would need this reference data on each ‘consuming’ operator instance, 
we would be essentially nearly double the amount of tuples coming through the 
operator. Since low-latency is  key here, this was a concern, the assumption 
being that the two sides of the ‘connect’ share the same resources – whereas 
using a background thread to update a ‘map’ would not be competing with the 
incoming tuples)

I realize that structurally, connect is a neater solution.

If I can be convinced that my above concerns are unfounded, I’ll be happy to 
try that direction.

Thanks
Hayden

From: Stavros Kontopoulos [mailto:st.kontopou...@gmail.com]
Sent: Monday, October 02, 2017 2:24 PM
To: Marchant, Hayden [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: In-memory cache

How about connecting two streams of data, one from the reference data and one 
from the main data (I assume using key streams as you mention QueryableState) 
and keep state locally within the operator.
The idea is to have a local sub-copy of the reference data within the operator 
that is updated from the source of the reference data. Reference data are still 
updated
externally from that low latency flink app. Here is a relevant question: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-state-in-connected-streams-td8727.html.
 Would that help?

Stavros



On Mon, Oct 2, 2017 at 1:46 PM, Marchant, Hayden 
mailto:hayden.march...@citi.com>> wrote:
We have an operator in our streaming application that needs to access 
'reference data' that is updated by another Flink streaming application. This 
reference data has about ~10,000 entries and has a small footprint. This 
reference data needs to be updated ~ every 100 ms. The required latency for  
this application is extremely low ( a couple of milliseconds), and we are 
therefore cautious of paying cost of I/O to access the reference data remotely. 
We are currently examining 3 different options for accessing this reference 
data:

1. Expose the reference data as QueryableState and access it directly from the 
'client' streaming operator using the QueryableState API
2. same as #1, but create an In-memory Java cache of the reference data within 
the operator that is asynchronously updated at a scheduled frequency using the 
QueryableState API
3. Output the reference data to Redis, and create an in-memory java cache of 
the reference data within the operator that is asynchronously updated at a 
scheduled frequency using Redis API.

My understanding is that one of the cons of using Queryable state, is that if 
the Flink application that generates the reference data is unavailable, the 
Queryable state will not exist - is that correct?

If we were to use an asynchronously scheduled 'read' from the distributed 
cache, where should it be done? I was thinking of using 
ScheduledExecutorService from within the open method of the Flink operator.

What is the best way to get this done?

Regards,
Hayden Marchant



Fwd: Consult about flink on mesos cluster

2017-10-02 Thread Bo Yu
Hello all,
This is Bo, I met some problems when I tried to use flink in my mesos
cluster (1 master, 2 slaves (cpu has 32 cores)).
I tried to start the mesos-appmaster.sh in marathon, the job manager is
started without problem.

mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Dtaskmanager.heap.mb=1024
-Dtaskmanager.numberOfTaskSlots=32

My problem is the task managers are all located in one single slave.
1. (log1)
The initial tasks in "/usr/local/flink/conf/flink-conf.yaml" is setted as
"mesos.initial-tasks: 2"
And also set the "mesos.constraints.hard.hostattribute: rack:ak09-27",
which is the master node of mesos cluster.

2. (log2)
I tried many ways to distribute the tasks to all the available slaves, and
without any success.
So I decide to try add a group_by operator which I referenced from
https://mesosphere.github.io/marathon/docs/constraints.html
"mesos.constraints.hard.hostattribute: rack:ak09-27,GROUP_BY:2"
According to the log, flink keep waiting for more offers and the tasks
never been launched.

Sorry, I am a newbie to flink, also on mesos. Please reply if my problem is
not clear, and I will be appreciate on any hint about how to distribute
task evenly on available resources.

Thank you in advance.

Best regards,

Bo
2017-10-02 10:51:28,023 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosJobManager  - JobManager 
akka.tcp://flink@x103.tail_of_hostname:6123/user/jobmanager was granted 
leadership with leader session ID Some(661d24c7--4d15-8583-efd2ee3c2e3b).
2017-10-02 10:51:28,032 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosJobManager  - Delaying 
recovery of all jobs by 1 milliseconds.
2017-10-02 10:51:28,041 INFO  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
reachable under 
akka.tcp://flink@x103.tail_of_hostname:6123/user/jobmanager:661d24c7--4d15-8583-efd2ee3c2e3b.
2017-10-02 10:51:28,044 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
Trying to associate with JobManager leader 
akka.tcp://flink@x103.tail_of_hostname:6123/user/jobmanager
2017-10-02 10:51:28,051 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
Resource Manager associating with leading JobManager 
Actor[akka://flink/user/jobmanager#923003410] - leader session 
661d24c7--4d15-8583-efd2ee3c2e3b
2017-10-02 10:51:28,176 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
Scheduling Mesos task taskmanager-1 with (1024.0 MB, 1.0 cpus).
2017-10-02 10:51:28,193 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
Scheduling Mesos task taskmanager-2 with (1024.0 MB, 1.0 cpus).
2017-10-02 10:51:28,195 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Now gathering 
offers for at least 2 task(s).
2017-10-02 10:51:28,207 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Received 
offer(s) of 252958.0 MB, 61.25 cpus:
2017-10-02 10:51:28,208 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator-   
18f3d9d0-7982-491a-8397-0e799272fd2d-O1246 from x101.tail_of_hostname 
of 127375.0 MB, 31.5 cpus
2017-10-02 10:51:28,208 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator-   
18f3d9d0-7982-491a-8397-0e799272fd2d-O1247 from x103.tail_of_hostname 
of 125583.0 MB, 29.75 cpus
2017-10-02 10:51:29,225 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Processing 2 
task(s) against 2 new offer(s) plus outstanding offers.
2017-10-02 10:51:29,241 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Resources 
considered: (note: expired offers not deducted from below)
2017-10-02 10:51:29,243 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator-   
x103.tail_of_hostname has 125583.0 MB, 29.75 cpus
2017-10-02 10:51:29,243 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator-   
x101.tail_of_hostname has 127375.0 MB, 31.5 cpus
2017-10-02 10:51:29,413 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Launched 2 
task(s) on x101.tail_of_hostname using 1 offer(s):
2017-10-02 10:51:29,413 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator-   
18f3d9d0-7982-491a-8397-0e799272fd2d-O1246
2017-10-02 10:51:29,414 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- No longer 
gathering offers; all requests fulfilled.
2017-10-02 10:51:29,414 INFO  com.netflix.fenzo.TaskScheduler   
- Expiring all leases
2017-10-02 10:51:29,415 INFO  
org.apache.flink.mesos.scheduler.LaunchCoordinator- Declined offer 
18f3d9d0-7982-491a-8397-0e799272fd2d-O1247 from x103.tail_of_hostname 
of 125583.0 MB, 29.75 cpus.
2017-10-02 10:51:29,518 INFO  
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
Launching Mesos task taskmanager-000

Re: how many 'run -c' commands to start?

2017-10-02 Thread r. r.
Thanks, Chesnay, that was indeed the problem.
It also explains why -p5 was not working for me from the cmdline

Best regards
Robert






 > Оригинално писмо 

 >От: Chesnay Schepler ches...@apache.org

 >Относно: Re: how many 'run -c' commands to start?

 >До: "r. r." 

 >Изпратено на: 01.10.2017 00:58



 
> The order in which you pass program arguments is quite important. Can 
 
> you try again putting --detached directly after run?
 
> I.e. flink run --detached -c ...
 
> 
 
> The reason being that arguments after the jar are treated as arguments 
 
> for the user-program, and not as arguments for the job execution.
 
> 
 
> On 29.09.2017 17:23, r. r. wrote:
 
> > Sure, here is the cmdline output:
 
> >
 
> > flink-1.3.2/bin/flink run -c com.corp.flink.KafkaJob quickstart.jar --topic 
> > InputQ --bootstrap.servers localhost:9092 --zookeeper.connect 
> > localhost:2181 --group.id Consumers -p 5 --detached
 
> > Cluster configuration: Standalone cluster with JobManager at 
> > localhost/127.0.0.1:6123
 
> > Using address localhost:6123 to connect to JobManager.
 
> > JobManager web interface address http://localhost:8081
 
> > Starting execution of program
 
> > Submitting job with JobID: 5cc74547361cec2ff9874764dac9ee91. Waiting for 
> > job completion.
 
> > Connected to JobManager at 
> > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-100905913] with 
> > leader session id ----.
 
> > 09/28/2017 16:30:40Job execution switched to status RUNNING.
 
> > 09/28/2017 16:30:40Source: Custom Source -> Map(1/5) switched to 
> > SCHEDULED
 
> > 09/28/2017 16:30:40Source: Custom Source -> Map(2/5) switched to 
> > SCHEDULED
 
> > 09/28/2017 16:30:40Source: Custom Source -> Map(3/5) switched to 
> > SCHEDULED
 
> > 09/28/2017 16:30:40Source: Custom Source -> Map(4/5) switched to 
> > SCHEDULED
 
> > 09/28/2017 16:30:40Source: Custom Source -> Map(5/5) switched to 
> > SCHEDULED
 
> > 09/28/2017 16:30:40Sink: Unnamed(1/5) switched to SCHEDULED
 
> > 09/28/2017 16:30:40Sink: Unnamed(2/5) switched to SCHEDULED
 
> > 09/28/2017 16:30:40Sink: Unnamed(3/5) switched to SCHEDULED
 
> > 09/28/2017 16:30:40Sink: Unnamed(4/5) switched to SCHEDULED
 
> > 09/28/2017 16:30:40Sink: Unnamed(5/5) switched to SCHEDULED
 
> > ...
 
> >
 
> > I thought --detach will put the process in the background, and give me back 
> > the cmdline, but maybe I got the meaning behind this option wrong?
 
> >
 
> > Thank you!
 
> >
 
> >
 
> >
 
> >
 
> >
 
> >
 
> >
 
> >   > Оригинално писмо 
 
> >
 
> >   >От: Chesnay Schepler ches...@apache.org
 
> >
 
> >   >Относно: Re: how many 'run -c' commands to start?
 
> >
 
> >   >До: user@flink.apache.org
 
> >
 
> >   >Изпратено на: 29.09.2017 18:01
 
> >
 
> >
 
> >
 
> >   
 
> >   
 
> >> 
 
> >   
 
> >>   
 
> >   
 
> >>
 
> >   
 
> >>
 
> >   
 
> >> The only nodes that matter are those on which the Flink processes,  i.e
 
> >   
 
> >> Job- and TaskManagers, are being run.
 
> >   
 
> >>  
 
> >   
 
> >>  To prevent a JobManager node failure from causing the job to fail you 
> >> have to look into an
 
> >   
 
> >> HA setup.
 
> >   
 
> >>  (The jobmanager is responsible for distributing/coordinating work)
 
> >   
 
> >>  
 
> >   
 
> >>  In case of a TaskManager node failure the job will fail and
 
> >   
 
> >> restart, provided that enough TaskManagers are
 
> >   
 
> >>  still alive to satisfy the resource requirements of the job.
 
> >   
 
> >>  
 
> >   
 
> >>  
 
> >   
 
> >>  Can you elaborate a bit more what happened when you used the  
> >> --detached param?
 
> >   
 
> >>  
 
> >   
 
> >>  On 28.09.2017 16:33, r. r. wrote:
 
> >   
 
> >>  
 
> >   
 
> >> 
 
> >   
 
> >> 
 
> >   
 
> >> Thank you, Chesnay
 
> >   
 
> >> to make sure - should the node where the job has been submitted goes down, 
> >> the processing will continue, I hope?
 
> >   
 
> >> Do I need to ensure this by configuration?
 
> >   
 
> >   
 
> >> btw I added --detached param to the run cmd, but it didn't go into 
> >> background process as I would've expected. Am I guessing wrong?
 
> >   
 
> >   
 
> >> Thanks!
 
> >   
 
> >> Rob
 
> >   
 
> >   
 
> >>   > Оригинално писмо 
 
> >   
 
> >   
 
> >>   >От: Chesnay Schepler ches...@apache.org
 
> >   
 
> >   
 
> >>   >Относно: Re: how many 'run -c' commands to start?
 
> >   
 
> >   
 
> >>   >До: user@flink.apache.org
 
> >   
 
> >   
 
> >>   >Изпратено на: 28.09.2017 15:05
 
> >   
 
> >   
 
> >>   
 
> >   
 
> >>   
 
> >   
 
> >>  
 
> >   
 
> >>  Hi!
 
> >   
 
> >>   
 
> >   
 
> >>  
 
> >   
 
> >>  
 
> >   
 
> >>   
 
> >   
 
> >>  
 
> >   
 
> >>   
 
> >   
 
> >>  
 
> >   
 
> >>  
 
> >   
 
> >>   
 
> >   
 
> >>  
 
> >   
 
> >>  Given a Flink cluster,

Re: In-memory cache

2017-10-02 Thread Stavros Kontopoulos
How about connecting two streams of data, one from the reference data and
one from the main data (I assume using key streams as you mention
QueryableState) and keep state locally within the operator.
The idea is to have a local sub-copy of the reference data within the
operator that is updated from the source of the reference data. Reference
data are still updated
externally from that low latency flink app. Here is a relevant question:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-state-in-connected-streams-td8727.html.
Would that help?

Stavros



On Mon, Oct 2, 2017 at 1:46 PM, Marchant, Hayden 
wrote:

> We have an operator in our streaming application that needs to access
> 'reference data' that is updated by another Flink streaming application.
> This reference data has about ~10,000 entries and has a small footprint.
> This reference data needs to be updated ~ every 100 ms. The required
> latency for  this application is extremely low ( a couple of milliseconds),
> and we are therefore cautious of paying cost of I/O to access the reference
> data remotely. We are currently examining 3 different options for accessing
> this reference data:
>
> 1. Expose the reference data as QueryableState and access it directly from
> the 'client' streaming operator using the QueryableState API
> 2. same as #1, but create an In-memory Java cache of the reference data
> within the operator that is asynchronously updated at a scheduled frequency
> using the QueryableState API
> 3. Output the reference data to Redis, and create an in-memory java cache
> of the reference data within the operator that is asynchronously updated at
> a scheduled frequency using Redis API.
>
> My understanding is that one of the cons of using Queryable state, is that
> if the Flink application that generates the reference data is unavailable,
> the Queryable state will not exist - is that correct?
>
> If we were to use an asynchronously scheduled 'read' from the distributed
> cache, where should it be done? I was thinking of using
> ScheduledExecutorService from within the open method of the Flink operator.
>
> What is the best way to get this done?
>
> Regards,
> Hayden Marchant
>
>


Re: How flink monitor source stream task(Time Trigger) is running?

2017-10-02 Thread yunfan123
Thank you. 
"If SourceFunction.run methods returns without an exception Flink assumes
that it has cleanly shutdown and that there were simply no more elements to
collect/create by this task. "
This sentence solve my confusion.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to deal with blocking calls inside a Sink?

2017-10-02 Thread Federico D'Ambrosio
Hi Timo,

thank you for your response. Just yesterday I tried using the jdbc
connector and unfortunately I found out that HivePreparedStatement and
HiveStatement implementations still don't have an addBatch implementation,
whose interface is being used in the connector. The first dirty solution
that came to my mind was to slightly modify the current JDBCOutputFormat in
order to make a concatenation of insert queries, thus strings, to pass on
to the execute() method of the Statement.

I guess that using AsyncIO would be really the best approach, as you're
saying.

Regards,
Federico

2017-10-02 12:17 GMT+02:00 Timo Walther :

> Hi Federico,
>
> would it help to buffer events first and perform batches of insertions for
> better throughtput? I saw some similar work recently here:
> https://tech.signavio.com/2017/postgres-flink-sink
>
> But I would first try the AsyncIO approach, because actually this is a use
> case it was made for.
>
> Regards,
> Timo
>
>
> Am 10/2/17 um 11:53 AM schrieb Federico D'Ambrosio:
>
> Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've
> integrated it in my current flink job, I noticed that the processing of the
> events slowed down really bad, I guess because of some blocking calls that
> need to be when interacting with hive streaming api.
>
> So, what can be done to make it so the throughput doesn't get hurt by
> these calls? I guess increasing (by a lot) the parallelism of the sink
> operator could be a solution, but I'd think it's not really a good one.
>
> Maybe using the AsyncFunction API? Decoupling the sink in a buffer which
> sends the data + operations to be made in the asyncInvoke method of the
> AsyncFunction?
>
> Any suggestion is appreciated.
> Kind regards,
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Flink Watermark and timing

2017-10-02 Thread Björn Zachrisson
Hi,

I have a question regarding timing of events.

According to;
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#event-time-and-watermarks

All events up to and equal to watermark should be handled in "the prevoius
window".

In my case I use event-timestamp.


I'm testing the timing out.

The case is events from 2000-01-01 02:00:00 and up to 2000-01-01 02:20:00
where eavh event is 2 minutes apart. I try to group the events in 5 minute
windows

2000-01-01 02:00:00 => 2000-01-01 02:05:00
2000-01-01 02:05:00 => 2000-01-01 02:10:00
2000-01-01 02:10:00 => 2000-01-01 02:15:00
2000-01-01 02:15:00 => 2000-01-01 02:20:00

How ever, events at the exakt time 02:10:00 (94669260) is put in the
Window "2000-01-01 02:10:00 => 2000-01-01 02:15:00" which is not according
to what i can read on the wiki.

This is the exakt result;
2000-01-01 02:00:00, 94669200
2000-01-01 02:02:00, 94669212
2000-01-01 02:04:00, 94669224

2000-01-01 02:06:00, 94669236
2000-01-01 02:08:00, 94669248

2000-01-01 02:10:00, 94669260
2000-01-01 02:12:00, 94669272
2000-01-01 02:14:00, 94669284

2000-01-01 02:16:00, 94669296
2000-01-01 02:18:00, 94669308

2000-01-01 02:20:00, 94669320

Is this due to that I'm using event time extractor or what might be the
case?

Regards
Björn


In-memory cache

2017-10-02 Thread Marchant, Hayden
We have an operator in our streaming application that needs to access 
'reference data' that is updated by another Flink streaming application. This 
reference data has about ~10,000 entries and has a small footprint. This 
reference data needs to be updated ~ every 100 ms. The required latency for  
this application is extremely low ( a couple of milliseconds), and we are 
therefore cautious of paying cost of I/O to access the reference data remotely. 
We are currently examining 3 different options for accessing this reference 
data:

1. Expose the reference data as QueryableState and access it directly from the 
'client' streaming operator using the QueryableState API
2. same as #1, but create an In-memory Java cache of the reference data within 
the operator that is asynchronously updated at a scheduled frequency using the 
QueryableState API
3. Output the reference data to Redis, and create an in-memory java cache of 
the reference data within the operator that is asynchronously updated at a 
scheduled frequency using Redis API. 

My understanding is that one of the cons of using Queryable state, is that if 
the Flink application that generates the reference data is unavailable, the 
Queryable state will not exist - is that correct?

If we were to use an asynchronously scheduled 'read' from the distributed 
cache, where should it be done? I was thinking of using 
ScheduledExecutorService from within the open method of the Flink operator.

What is the best way to get this done?

Regards,
Hayden Marchant



Re: How to deal with blocking calls inside a Sink?

2017-10-02 Thread Timo Walther

Hi Federico,

would it help to buffer events first and perform batches of insertions 
for better throughtput? I saw some similar work recently here: 
https://tech.signavio.com/2017/postgres-flink-sink


But I would first try the AsyncIO approach, because actually this is a 
use case it was made for.


Regards,
Timo


Am 10/2/17 um 11:53 AM schrieb Federico D'Ambrosio:
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once 
I've integrated it in my current flink job, I noticed that the 
processing of the events slowed down really bad, I guess because of 
some blocking calls that need to be when interacting with hive 
streaming api.


So, what can be done to make it so the throughput doesn't get hurt by 
these calls? I guess increasing (by a lot) the parallelism of the sink 
operator could be a solution, but I'd think it's not really a good one.


Maybe using the AsyncFunction API? Decoupling the sink in a buffer 
which sends the data + operations to be made in the asyncInvoke method 
of the AsyncFunction?


Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio





Re: Enriching data from external source with cache

2017-10-02 Thread Timo Walther

Hi Derek,

maybe the following talk can inspire you, how to do this with joins and 
async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM (around the 17th 
min). Basically, you split the stream and wait for an Async IO result in 
a downstream operator.


But I think having a transient guava cache is not a bad idea, since it 
is only a cache it does not need to be checkpointed and can be recovered 
at any time.


Implementing you own logic in a ProcessFunction is always a way, but 
might require more implementation effort.


Btw. if you feel brave enough, you could also think of contributing a 
stateful async IO. It should not be too much effort to make this work.


Regards,
Timo



Am 9/29/17 um 8:39 PM schrieb Derek VerLee:
My basic problem will sound familiar I think, I need to enrich 
incoming data using a REST call to an external system for slowly 
evolving metadata. and some cache based lag is acceptable, so to 
reduce load on the external system and to process more efficiently, I 
would like to implement a cache.  The cache would by key, and I am 
already doing a keyBy for the same key in the job.


Please correct me if I'm wrong:
* Keyed State would be great to store my metadata "cache", Async I/O 
is ideal for pulling from the external system,
but AsyncFunction can not access keyed state ( "Exception: State is 
not supported in rich async functions.") and operators can not share 
state between them.


This leaves me wondering, since side inputs are not here yet, what the 
best (and perhaps most idiomatic) way to approach my problem?


I'd rather keep changes to existing systems minimal for this iteration 
and just minimize impact on them during peaks best I can... systemic 
refactoring and re-architecture will be coming soon (so I'm happy to 
hear thoughts on that as well).


Approaches considered:

1. AsyncFunction with a transient guava cache.  Not ideal ... but 
maybe good enough to get by
2. Using compound message types (oh, if only java had real algebraic 
data types...) and send cache miss messages from some 
CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not keyed) 
which then backfeeds cache updates to the former via iteration ... i 
don't know why this couldn't work but it feels like a hot mess unless 
there is some way I am not thinking of to do it cleanly
3. One user mentioned on a similar thread loading the data in as 
another DataStream and then using joins, but I'm confused about how 
this would work, it seems to me that joins happen on windows, windows 
pertain to (some notion of) time, what would be my notion of time for 
the slow (maybe years old in some cases) meta-data?

4. Forget about async I/O
5. implement my own "async i/o" in using a process function or 
similar  .. is this a valid pattern





Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-02 Thread Federico D'Ambrosio
As a followup:

the flink job has currently an uptime of almost 24 hours, with no
checkpoint failed or restart whereas, with async snapshots, it would have
already crashed 50 or so times.

Regards,
Federico

2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Thank you very much, Gordon.
>
> I'll try to run the job without the asynchronous snapshots first thing.
>
> As for the Event data type: it's a case class with 2 fields: a String ID
> and a composite case class (let's call it RealEvent) containing 3 fields of
> the following types: Information, which is a case class with String fields,
> Coordinates, a nested case class with 2 Double and InstantValues, with 3
> Integers and a DateTime.This DateTime field in InstantValues is the one
> being evalued in the maxBy (via InstantValues and RealEvent compareTo
> implementations, because dot notation is not working in scala as of 1.3.2,
> FLINK-7629 ) and that
> was the reason in the first place I had to register the
> JodaDateTimeSerializer with Kryo.
>
> Regards,
> Federico
>
>
>
>
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai :
>
>> Hi,
>>
>> Thanks for the extra info, it was helpful (I’m not sure why your first
>> logs didn’t have the full trace, though).
>>
>> I spent some time digging through the error trace, and currently have
>> some observations I would like to go through first:
>>
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>> trying to access the state and making a copy (via serialization) in the
>> CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the
>> reducing window function (i.e. the maxBy). The state type should be the
>> same as the records in your `events` DataStream, which seems to be a Scala
>> case class with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>> trying to copy that field ..
>>
>> My current guess would perhaps be that the serializer internally used may
>> have been incorrectly shared, which is probably why this exception happens
>> randomly for you.
>> I recall that there were similar issues that occurred before due to the
>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>> shared in Flink.
>>
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running
>> this job a bit more to see if the problem still occurs? This is mainly to
>> eliminate my guess on whether or not there is some incorrect serializer
>> usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case
>> class looks like?
>>
>> Also looping in Aljoscha and Stefan here, as they would probably have
>> more insights in this.
>>
>> Cheers,
>> Gordon
>>
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>> federico.dambro...@smartlab.ws) wrote:
>>
>> Hi Gordon,
>>
>> I remembered that I had already seen this kind of exception once during
>> the testing of the current job and fortunately I had the complete
>> stacktrace still saved on my pc:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:176)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:279)
>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:296)
>> at org.apache.flink.runtime.state.heap.HeapReducingState.get(
>> HeapReducingState.java:68)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.onEventTime(WindowOperator.java:498)
>> at org.apache.flink.streaming.api.operators.HeapInternalTimerSe
>> rvice.advanceWatermark(HeapInternalTimerService.java:275)
>> at org.apache.flink.streaming.api.operators.InternalTimeService
>> Manager.advanceWatermark(InternalTimeServiceManager.java:107)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.processWatermark(AbstractStreamOperator.java:946)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcess

How to deal with blocking calls inside a Sink?

2017-10-02 Thread Federico D'Ambrosio
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've
integrated it in my current flink job, I noticed that the processing of the
events slowed down really bad, I guess because of some blocking calls that
need to be when interacting with hive streaming api.

So, what can be done to make it so the throughput doesn't get hurt by these
calls? I guess increasing (by a lot) the parallelism of the sink operator
could be a solution, but I'd think it's not really a good one.

Maybe using the AsyncFunction API? Decoupling the sink in a buffer which
sends the data + operations to be made in the asyncInvoke method of the
AsyncFunction?

Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio


Re: Windowing isn't applied per key

2017-10-02 Thread Timo Walther

Hi Marcus,

from a first glance your pipeline looks correct. It should not be 
executed with a parallelism of one, if not specified explicitly. Which 
time semantics are you using? If it is event-time, I would check your 
timestamps and watermarks assignment. Maybe you can also check in the 
web frontend which operator is executed with which parallelism. Btw. 
according to the JavaDocs of reduce(): "Sliding time windows will 
aggregate on the granularity of the slide interval" so it is called 
multiple times.


Regards,
Timo


Am 9/29/17 um 8:56 PM schrieb Marcus Clendenin:



I have a job that is performing an aggregation over a time window. 
This windowing is supposed to be happening by key, but the output I am 
seeing is creating an overall window on everything coming in. Is this 
happening because I am doing a map of the data before I am running the 
keyBy command? This is a representation of what I am running


*val *stream = env
  .addSource(kafkaConsumer)

//filter out bad json

*val *jsonDeserializer = *new *JSONDeserializationSchema()
*val *filteredStream = stream.filter(text => {
*try *{
  jsonDeserializer.deserialize(text.getBytes)
*true
*}
*catch *{
*case *e: Exception => *false
*}
  })
  val kafkaStream = filteredStream.map(text => 
jsonDeserializer.deserialize(text.getBytes))


//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())

 



I am pulling in data from Kafka as a String, mapping it to my data 
model and then pulling out the key, applying the time window with a 30 
minute window, 5 minute slide and doing an aggregation. I am 
expecting 
that the aggregation is happening on a time window that is separate 
for each iteration of the key but it is happening every 5 minutes for 
all keys.