Re: Question about jobmanager.web.upload.dir

2017-05-17 Thread Mauro Cortellazzi

Thank you Chesnay and Timo for your answers.


Il 17/05/2017 16:49, Chesnay Schepler ha scritto:
I don't know why we delete it either, but my guess is that at one 
point this was a temporary directory that we properly cleaned up, and 
later allowed it to be configurable.


Currently, this directory must be in the local file system. We could 
change it to also allow non-local paths, which should be fairly easy 
to do. Add a condition that the directory is only cleaned up when it 
wasn't explicitly configured and we should've covered everything.


In regards to the HistoryServer, i mean of course could we include the 
user-jar, but I'm wondering about the user-cases for it.
It's not like you could "just download it and give it to a 
JobManager"; you would be missing any additional libraries put under 
/lib, as well as the original configuration of the cluster. It would 
also require extra steps (and tools) to identify which dependencies 
are required.


And there are downsides to this, like a significant increase in the 
size of the job archives (along with a duplication of jars for every 
job submission), and the archiving also becomes more complex.


On 17.05.2017 15:52, Timo Walther wrote:

Hey Mauro,

I'm not aware of any reason for that. I loop in Chesnay, maybe he 
knows why. @Chesnay wouldn't it be helpful to also archive the jars 
using the HistoryServer?


Timo


Am 17.05.17 um 12:31 schrieb Mauro Cortellazzi:

Hi Flink comunity,

is there a particular reason to delete the jobmanager.web.upload.dir 
content when the JobManager restart? Could it be interesting have 
the jar previously uploaded when JM restart? Could the jars be saved 
to HDFS for HA mode?


Thank you for your support.

Mauro










Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-17 Thread Jia Teoh
I'll take a look into the ProcessFunction, thanks for the suggestion.

-Jia

On Wed, May 17, 2017 at 12:33 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Jia,
>
> Actually just realized you can access the timestamp of records via the
> more powerful `ProcessFunction` [1].
> That’ll be a bit easier to use than implementing your own custom operator,
> which is quite low-level.
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/process_function.html
>
> On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiat...@gmail.com) wrote:
>
> Hi Gordon,
>
> The timestamps are required for application logic. Thank you for
> clarifying the custom operators - seems I mistakenly thought of the
> functions that are passed to the operators rather than the operators
> themselves. AbstractStreamOperator and the other classes you mentioned seem
> like exactly what I'm looking for, so I will take a look into implementing
> a custom one for my use case.
>
> Thank you,
> Jia
>
> On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Jia,
>>
>> How exactly do you want to use the Kafka timestamps? Do you want to
>> access them and alter them with new values as the record timestamp? Or do
>> you want to use them for some application logic in your functions?
>>
>> If its the former, you should be able to do that by using timestamp /
>> watermark extractors. They come with an interface that exposes the current
>> timestamp of the record. For Kafka 0.10, that timestamp would be the Kafka
>> record’s timestamp if it hasn’t been explicitly assigned any other
>> timestamp yet.
>>
>> If its the latter, then I think currently you have to use custom
>> operators as Robert mentioned.
>> Custom operators are classes that extend the `AbstractStreamOperator`
>> base class as one of `OneInputStreamOperator` or `TwoInputStreamOperator`
>> interfaces.
>> You can take a look at the basic `StreamMap` or `StreamFlatMap` classes
>> for an example, which are the underlying operators for the map and flatMap
>> functions.
>>
>> At the operator level, you’ll have access to a `StreamRecord` in the
>> `processElement` function which wraps the record value (which you get when
>> implementing functions) as well as the internal timestamp that comes with
>> the record.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiat...@gmail.com) wrote:
>>
>> Hi Robert,
>>
>> Thanks for the reply. I ended up implementing an extension of the Kafka
>> fetcher and consumer so that the deserialization API can include the
>> timestamp field, which is sufficient for my specific use case. I can share
>> the code if desired but it seems like it's an intentional design decision
>> not to expose the timestamp in the deserialization API.
>>
>> I noticed you mentioned that I could use a custom operator to access the
>> record event time. Could you elaborate on what you mean by "operator"? I
>> initially thought that referred to DataStream.map/reduce/etc, but none of
>> those functions provide arguments that can be used to extract the embedded
>> timestamp.
>>
>> Thanks,
>> Jia Teoh
>>
>> On Fri, May 12, 2017 at 9:25 AM, Robert Metzger 
>> wrote:
>>
>>> Hi Jia,
>>>
>>> The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but
>>> it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read
>>> the event timestamps from Kafka 10.
>>> We don't expose the timestamp through the deserilaization API, because
>>> we set it internally in Flink. (there is a "hidden" field with each record
>>> containing the event time of the event)
>>>
>>> With a custom operator you can access the event time of a record.
>>>
>>> On Fri, May 12, 2017 at 3:26 AM, Jia Teoh  wrote:
>>>
 Hi,

 Is there a way to retrieve the timestamps that Kafka associates with
 each key-value pair within Flink? I would like to be able to use these as
 values within my application flow, and defining them before or after Kafka
 is not acceptable for the use case due to the latency involved in sending
 or receiving from Kafka.

 It seems that Flink supports Kafka event time (link
 )
 but after a brief trace it seems that KafkaConsumer010 still relies on the
 Kafka09Fetcher
 
  for
 iterating through each Kafka record and deserializing it. The
 KeyedDeserializationSchema api does not seem to have support for including
 timestamp as additional metadata (just offset, topic, and partition) so
 something such as JSONKeyValueDeserializationSchema will not return
 the Kafka-specified timestamp.

 For reference, I am using Kafka 0.10.2 and the Flin

Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-17 Thread Matt
Check the repo at [1].

The important step which I think is what you missed is running an Ignite
node on your computer so the Java code, which launches an Ignite client on
the JVM, connects to it and executes Flink on that node on a local
environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml),
because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any
problem let me know!

Cheers,
Matt

[1] https://github.com/Dromit/FlinkTest

On Wed, May 17, 2017 at 3:49 PM, Matt  wrote:

> Thanks for your help Till.
>
> I will create a self contained test case in a moment and send you the
> link, wait for it.
>
> Cheers,
> Matt
>
> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann 
> wrote:
>
>> Hi Matt,
>>
>> alright, then we have to look into it again. I tried to run your example,
>> however, it does not seem to be self-contained. Using Ignite 2.0.0 with
>> -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
>> Ignite#start. In the logs I see the following warning:
>>
>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it 
>> is recommended in production to specify at least one address in 
>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
>> WARNING: IP finder returned empty addresses list. Please check IP finder 
>> configuration and make sure multicast works on your network. Will retry 
>> every 2 secs.
>>
>> However, I assume that this is not critical.
>>
>> Maybe you can tell me how I can run your example in order to debug it.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, May 15, 2017 at 10:05 PM, Matt  wrote:
>>
>>> Hi Till,
>>>
>>> I just tried with Flink 1.4 by compiling the current master branch on
>>> GitHub (as of this morning) and I still find the same problem as before. If
>>> I'm not wrong your PR was merged already, so your fixes should be part of
>>> the binary.
>>>
>>> I hope you have time to have a look at the test case in [1].
>>>
>>> Best,
>>> Matt
>>>
>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>>
>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt  wrote:
>>>
 Hi Till,

 Great! Do you know if it's planned to be included in v1.2.x or should
 we wait for v1.3? I'll give it a try as soon as it's merged.

 You're right about this approach launching a mini cluster on each
 Ignite node. That is intentional, as described in my previous message on
 the list [1].

 The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
 only processes the elements stored on the local in-memory database. I get
 the impression this should be much faster than randomly picking a Flink
 node and sending all the data over the network.

 Any insight on this?

 Cheers,
 Matt

 [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/Flink-on-Ignite-Collocation-td12780.html

 On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann 
 wrote:

> I just copied my response because my other email address is not
> accepted on the user mailing list.
>
> Hi Matt,
>
> I think Stefan's analysis is correct. I have a PR open [1], where I
> fix the issue with the class loader.
>
> As a side note, by doing what you're doing, you will spawn on each
> Ignite node a new Flink mini cluster. These mini cluster won't communicate
> with each other and run independently. Is this what you intend to do?
>
> [1] https://github.com/apache/flink/pull/3781
>
> Cheers,
> Till
>
> On Wed, Apr 26, 2017 at 11:12 PM, Matt  wrote:
>
>> Let's wait for Till then, I hope he can figure this out.
>>
>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Ok, now the question is also about what classloaders Ignite is
>>> creating and how they are used, but the relevant code line in Flink is
>>> probably in FlinkMiniCluster.scala, line 538 (current master):
>>>
>>>  try {
>>>  JobClient.submitJobAndWait(
>>>clientActorSystem,
>>>configuration,
>>>leaderRetrievalService,
>>>jobGraph,
>>>timeout,
>>>printUpdates,
>>>this.getClass.getClassLoader())
>>> } finally {
>>>if(!useSingleActorSystem) {
>>>  // we have to shutdown the just created actor system
>>>  shutdownJobClientActorSystem(clientActorSystem)
>>>}
>>>  }
>>>
>>>
>>> This is what is executed as part of executing a job through
>>> LocalEnvironment. As we can see, the classloader is set to the 
>>> classloader
>>> of FlinkMiniCluster. Depending on the classloader structure inside 
>>> Ignite,
>>> this cl

Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-17 Thread Matt
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link,
wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann  wrote:

> Hi Matt,
>
> alright, then we have to look into it again. I tried to run your example,
> however, it does not seem to be self-contained. Using Ignite 2.0.0 with
> -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
> Ignite#start. In the logs I see the following warning:
>
> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is 
> recommended in production to specify at least one address in 
> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: IP finder returned empty addresses list. Please check IP finder 
> configuration and make sure multicast works on your network. Will retry every 
> 2 secs.
>
> However, I assume that this is not critical.
>
> Maybe you can tell me how I can run your example in order to debug it.
>
> Cheers,
> Till
> ​
>
> On Mon, May 15, 2017 at 10:05 PM, Matt  wrote:
>
>> Hi Till,
>>
>> I just tried with Flink 1.4 by compiling the current master branch on
>> GitHub (as of this morning) and I still find the same problem as before. If
>> I'm not wrong your PR was merged already, so your fixes should be part of
>> the binary.
>>
>> I hope you have time to have a look at the test case in [1].
>>
>> Best,
>> Matt
>>
>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>
>> On Thu, Apr 27, 2017 at 10:09 AM, Matt  wrote:
>>
>>> Hi Till,
>>>
>>> Great! Do you know if it's planned to be included in v1.2.x or should we
>>> wait for v1.3? I'll give it a try as soon as it's merged.
>>>
>>> You're right about this approach launching a mini cluster on each Ignite
>>> node. That is intentional, as described in my previous message on the list
>>> [1].
>>>
>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>>> only processes the elements stored on the local in-memory database. I get
>>> the impression this should be much faster than randomly picking a Flink
>>> node and sending all the data over the network.
>>>
>>> Any insight on this?
>>>
>>> Cheers,
>>> Matt
>>>
>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>
>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann 
>>> wrote:
>>>
 I just copied my response because my other email address is not
 accepted on the user mailing list.

 Hi Matt,

 I think Stefan's analysis is correct. I have a PR open [1], where I fix
 the issue with the class loader.

 As a side note, by doing what you're doing, you will spawn on each
 Ignite node a new Flink mini cluster. These mini cluster won't communicate
 with each other and run independently. Is this what you intend to do?

 [1] https://github.com/apache/flink/pull/3781

 Cheers,
 Till

 On Wed, Apr 26, 2017 at 11:12 PM, Matt  wrote:

> Let's wait for Till then, I hope he can figure this out.
>
> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Ok, now the question is also about what classloaders Ignite is
>> creating and how they are used, but the relevant code line in Flink is
>> probably in FlinkMiniCluster.scala, line 538 (current master):
>>
>>  try {
>>  JobClient.submitJobAndWait(
>>clientActorSystem,
>>configuration,
>>leaderRetrievalService,
>>jobGraph,
>>timeout,
>>printUpdates,
>>this.getClass.getClassLoader())
>> } finally {
>>if(!useSingleActorSystem) {
>>  // we have to shutdown the just created actor system
>>  shutdownJobClientActorSystem(clientActorSystem)
>>}
>>  }
>>
>>
>> This is what is executed as part of executing a job through
>> LocalEnvironment. As we can see, the classloader is set to the 
>> classloader
>> of FlinkMiniCluster. Depending on the classloader structure inside 
>> Ignite,
>> this classloader might not know your user code. What you could do is
>> changing this line in a custom Flink build, changing line 538 for example
>> to Thread.currentThread().getContextClassloader() and ensuring that
>> the context classloader ins the runnable is a classloader that a) knows 
>> the
>> user code and b) is a child of the classloader that knows the Ignite and
>> Flink classes. Notice that this is not a general solution and should not
>> become a general fix.
>>
>> I have heard that Till is about to change some things about local
>> execution, so I included him in CC. Maybe he can provide additional hints
>> how your use case might be better supported

FlinkKafkaConsumer using Kafka-GroupID?

2017-05-17 Thread Valentin
Hi there,

As far as I understood, Flink Kafka Connectors don’t use the consumer group 
management feature from Kafka. Here the post I got the info from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html#none
 


For some reasons we cannot set up a flink-cluster environment, but we still 
need to assure high availability. e.g. in case one node goes down the second 
should still keep on running.


My question:
- Is there any chance to run 2 different flink (standalone) apps consuming 
messages from a single kafka-topic only once? This is what I could do by using 
2 native Kafka-Consumers within the same consumer-group.

Many thanks in advance
Valentin 
 

Re: FlinkCEP latency/throughput

2017-05-17 Thread Dean Wampler
On Wed, May 17, 2017 at 10:34 AM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hello Alfred,
>
> As a first general remark, Flink was not optimized for multicore
> deployments
> but rather for distributed environments. This implies overheads
> (serialization,
> communication etc), when compared to libs optimized for multicores. So
> there
> may be libraries that are better optimized for those settings if you are
> planning
> to use just a multicore machine.
>
> Now for your suggestion:
>
...

If you're interested in a multi-core option, check out Akka Streams
 or perhaps
the underlying Actor Model 
.



-- 
*Dean Wampler, Ph.D.*
VP, Fast Data Engineering



dean.wamp...@lightbend.com
@deanwampler 
https://www.linkedin.com/in/deanwampler
https://github.com/deanwampler


Re: FlinkCEP latency/throughput

2017-05-17 Thread Kostas Kloudas
Hello Alfred,

As a first general remark, Flink was not optimized for multicore deployments 
but rather for distributed environments. This implies overheads (serialization, 
communication etc), when compared to libs optimized for multicores. So there
may be libraries that are better optimized for those settings if you are 
planning 
to use just a multicore machine.

Now for your suggestion:

> On May 16, 2017, at 6:03 PM, Sonex  wrote:
> 
> Hello everyone,
> 
> I am testing some patterns with FlinkCEP and I want to measure latency and
> throughput when using 1 or more processing cores. How can I do that ??
> 
> What I have done so far:
> Latency: Each time an event arrives I store the system time
> (System.currentTimeMillis). When flink calls the select function which means
> we have a full pattern match, again I take the system time. The difference
> of the system time taken from the first event of the complex event and the
> system time taken when the function is called is the latency for now.
> 

1) If you are using event time, then you are also accounting for internal 
buffering and 
ordering of the incoming events.
 
2) I am not sure if measuring the time between the arrival of each element, and 
when 
its matching pattern is emitted makes much sense. In a long pattern, the first 
element
in the matching pattern will wait inevitably longer than the last one, right?

> Throughput: I divide the total number of the events of the dataset by the
> time taken to complete the experiment.
> 
> 

For throughput you could create a job with a sink that does nothing and only a 
CEP pattern
in your job and count the elements read by your source/min. If your source is 
not the bottleneck
then the CEP part of the pipeline is the dominating factor (given that your 
sink just discards everything
so it cannot create backpressure).

I hope this helps,
Kostas

Re: Question about jobmanager.web.upload.dir

2017-05-17 Thread Chesnay Schepler
I don't know why we delete it either, but my guess is that at one point 
this was a temporary directory that we properly cleaned up, and later 
allowed it to be configurable.


Currently, this directory must be in the local file system. We could 
change it to also allow non-local paths, which should be fairly easy to 
do. Add a condition that the directory is only cleaned up when it wasn't 
explicitly configured and we should've covered everything.


In regards to the HistoryServer, i mean of course could we include the 
user-jar, but I'm wondering about the user-cases for it.
It's not like you could "just download it and give it to a JobManager"; 
you would be missing any additional libraries put under /lib, as well as 
the original configuration of the cluster. It would also require extra 
steps (and tools) to identify which dependencies are required.


And there are downsides to this, like a significant increase in the size 
of the job archives (along with a duplication of jars for every job 
submission), and the archiving also becomes more complex.


On 17.05.2017 15:52, Timo Walther wrote:

Hey Mauro,

I'm not aware of any reason for that. I loop in Chesnay, maybe he 
knows why. @Chesnay wouldn't it be helpful to also archive the jars 
using the HistoryServer?


Timo


Am 17.05.17 um 12:31 schrieb Mauro Cortellazzi:

Hi Flink comunity,

is there a particular reason to delete the jobmanager.web.upload.dir 
content when the JobManager restart? Could it be interesting have the 
jar previously uploaded when JM restart? Could the jars be saved to 
HDFS for HA mode?


Thank you for your support.

Mauro








Re: Question about jobmanager.web.upload.dir

2017-05-17 Thread Timo Walther

Hey Mauro,

I'm not aware of any reason for that. I loop in Chesnay, maybe he knows 
why. @Chesnay wouldn't it be helpful to also archive the jars using the 
HistoryServer?


Timo


Am 17.05.17 um 12:31 schrieb Mauro Cortellazzi:

Hi Flink comunity,

is there a particular reason to delete the jobmanager.web.upload.dir 
content when the JobManager restart? Could it be interesting have the 
jar previously uploaded when JM restart? Could the jars be saved to 
HDFS for HA mode?


Thank you for your support.

Mauro





Re: Stateful streaming question

2017-05-17 Thread Flavio Pompermaier
Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit:

   - right now we're using Parquet on HDFS to store thrift objects. Those
   objects are essentially structured like
  - key
  - alternative_key
  - list of tuples (representing the state of my Object)
  - This model could be potentially modeled as a Monoid and it's very
  well suited for a stateful streaming computation where updates
to a single
  key state are not as expansive as a call to any db to get the
current list
  of tuples and update back that list with for an update (IMHO). Maybe here
  I'm overestimating Flink streaming capabilities...
   - serialization should be ok using thrift, but Flink advice to use
   tuples to have better performance so just after reading the data from disk
   (as a ThriftObject) we convert them to its equivalent representation as
   Tuple3> version
   - Since I currently use Flink to ingest data that (in the end) means
   adding tuples to my objects, it would be perfect to have an "online" state
   of the grouped tuples in order to:
  - add/remove tuples to my object very quickly
  - from time to time, scan the whole online data (or a part of it) and
  "translate" it into one ore more JSON indices (and put them into
  Elasticsearch)

@Fabian:
You're right that batch processes are bot very well suited to work with
services that can fail...if in a map function the remote call fails all the
batch job fails...this should be less problematic with streaming because
there's checkpointing and with async IO  is should be the possibile to add
some retry/backoff policies in order to not overload remote services like
db or solr/es indices (maybe it's not already there but it should be
possible to add). Am I wrong?

@Kostas:

>From what I understood Queryable state is usefult for gets...what if I need
to scan the entire db? For us it could be better do periodically dump the
state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe
to start a batch job that reads the dumped data while, in the meantime, a
possible update of this dump could happen...is there any potential problem
to data consistency (indeed tuples within grouped objects have references
to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hi Flavio,
>
> For setting the retries, unfortunately there is no such setting yet and,
> if I am not wrong, in case of a failure of a request,
> an exception will be thrown and the job will restart. I am also including
> Till in the thread as he may know better.
>
> For consistency guarantees and concurrency control, this depends on your
> underlying backend. But if you want to
> have end-to-end control, then you could do as Ankit suggested at his point
> 3), i.e have a single job for the whole pipeline
>  (if this fits your needs of course). This will allow you to set your own
> “precedence” rules for your operations.
>
> Now finally, there is no way currently to expose the state of a job to
> another job. The way to do so is either Queryable
> State, or writing to a Sink. If the problem for having one job is that you
> emit one element at a time, you can always group
> elements together and emit downstream less often, in batches.
>
> Finally, if  you need 2 jobs, you can always use a hybrid solution where
> you keep your current state in Flink, and you dump it
> to a Sink that is queryable once per week for example. The Sink then can
> be queried at any time, and data will be at most one
> week old.
>
> Thanks,
> Kostas
>
> On May 17, 2017, at 9:35 AM, Fabian Hueske  wrote:
>
> Hi Ankit, just a brief comment on the batch job is easier than streaming
> job argument. I'm not sure about that.
> I can see that just the batch job might seem easier to implement, but this
> is only one part of the whole story. The operational side of using batch is
> more complex IMO.
> You need a tool to ingest your stream, you need storage for the ingested
> data, you need a periodic scheduler to kick of your batch job, and you need
> to take care of failures if something goes wrong.
> The streaming case, this is not needed or the framework does it for you.
>
> Just my 2 cents, Fabian
>
> 2017-05-16 20:58 GMT+02:00 Jain, Ankit :
>
>> Hi Flavio,
>>
>> While you wait on an update from Kostas, wanted to understand the
>> use-case better and share my thoughts-
>>
>>
>>
>> 1)   Why is current batch mode expensive? Where are you persisting
>> the data after updates? Way I see it by moving to Flink, you get to use
>> RocksDB(a key-value store) that makes your lookups faster – probably right
>> now you are using a non-indexed store like S3 maybe?
>>
>> So, gain is coming from moving to a better persistence store suited to
>> your use-case than from batch->streaming. Myabe consider just going with a
>> different data store.
>>
>> IMHO, stream should only be used if you

Re: State in Custom Tumble Window Class

2017-05-17 Thread rizhashmi
Yes .. is there any possibility access flink state variables in
WindowAssigner.assignWindows method?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13196.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: State in Custom Tumble Window Class

2017-05-17 Thread rizhashmi
Thanks Walther, 

I am not keeping my window forever.if the event is arrived after graced
period(lateness) i update event time to current time or say last event time.
That essentially solve infinite issue. 

1.3 is not stable yet?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177p13195.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Timer fault tolerance in Flink

2017-05-17 Thread Kostas Kloudas
Hi Rahul,

The timers are fault tolerant and their timestamp is the absolute value of when 
to fire.
This means that if you are at time t = 10 and you register a timer “10 ms from 
now”, the timer will have a firing timestamp of 20.
This is checkpointed, so the “new machine” that takes over the failed task, 
will have the timer with timestamp 20.

So the when the timer will fire depends on the “new machine” and it may differ 
from what would happen in the previous machine in the 
following cases:
For processing time, in case your new machine (the one that takes over 
the failed task) has a clock that is out-of-sync with the 
previous machine that set the timer to 20.
For event time, given that Flink does not checkpoint watermarks, the 
timer will fire when the watermark on the new machine surpasses 
the timer's timestamp.

I hope this helps,
Kostas

> On May 17, 2017, at 12:36 PM, Rahul Kavale  wrote:
> 
> I am looking at timers in apache flink and wanted to confirm if the timers in 
> flink are fault tolerant.
> 
> eg. when a timer registered with processFunction, of say 20 sec is running on 
> a node and after 15 seconds (since the timer started), the node failed for 
> some reason. Does flink guarantee that the timer resume on another node? if 
> it does resume does it consider only the remaining time for the timer ie 5 
> sec in this case?
> 
> Thanks & Regards,
> 
> Rahul
> 



Timer fault tolerance in Flink

2017-05-17 Thread Rahul Kavale
I am looking at timers in apache flink and wanted to confirm if the timers
in flink are fault tolerant.

eg. when a timer registered with processFunction, of say 20 sec is running
on a node and after 15 seconds (since the timer started), the node failed
for some reason. Does flink guarantee that the timer resume on another
node? if it does resume does it consider only the remaining time for the
timer ie 5 sec in this case?

Thanks & Regards,

Rahul


Question about jobmanager.web.upload.dir

2017-05-17 Thread Mauro Cortellazzi

Hi Flink comunity,

is there a particular reason to delete the jobmanager.web.upload.dir 
content when the JobManager restart? Could it be interesting have the 
jar previously uploaded when JM restart? Could the jars be saved to HDFS 
for HA mode?


Thank you for your support.

Mauro



Bushy plan execution

2017-05-17 Thread Mathias Eriksen Otkjær
Hi


We have attempted to create a bushy logical plan for Flink, but we are not 
certain whether it actually gets executed in parallel or in a linear fashion 
inside Flink (we are certain that it works, as we get the same results now as 
we did using SQL in the table API). How can we confirm that such a plan does 
indeed get executed in the parallel fashion that we expect and on different 
taskmanagers? Should we just trust the framework to execute our query plan with 
operations running in parallel, or should we somehow manually set the 
parallelism of the individual join operations inside flink?


Thanks,

Jesper and Mathias


Re: Stateful streaming question

2017-05-17 Thread Kostas Kloudas
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I 
am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till 
in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your 
underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), 
i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own 
“precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another 
job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit 
one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you 
keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be 
queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

> On May 17, 2017, at 9:35 AM, Fabian Hueske  wrote:
> 
> Hi Ankit, just a brief comment on the batch job is easier than streaming job 
> argument. I'm not sure about that. 
> I can see that just the batch job might seem easier to implement, but this is 
> only one part of the whole story. The operational side of using batch is more 
> complex IMO. 
> You need a tool to ingest your stream, you need storage for the ingested 
> data, you need a periodic scheduler to kick of your batch job, and you need 
> to take care of failures if something goes wrong. 
> The streaming case, this is not needed or the framework does it for you.
> 
> Just my 2 cents, Fabian
> 
> 2017-05-16 20:58 GMT+02:00 Jain, Ankit  >:
> Hi Flavio,
> 
> While you wait on an update from Kostas, wanted to understand the use-case 
> better and share my thoughts-
> 
>  
> 
> 1)   Why is current batch mode expensive? Where are you persisting the 
> data after updates? Way I see it by moving to Flink, you get to use RocksDB(a 
> key-value store) that makes your lookups faster – probably right now you are 
> using a non-indexed store like S3 maybe?
> 
> So, gain is coming from moving to a better persistence store suited to your 
> use-case than from batch->streaming. Myabe consider just going with a 
> different data store.
> 
> IMHO, stream should only be used if you really want to act on the new events 
> in real-time. It is generally harder to get a streaming job correct than a 
> batch one.
> 
>  
> 
> 2)   If current setup is expensive due to serialization-deserialization 
> then that should be fixed by moving to a faster format (maybe AVRO? - I don’t 
> have a lot of expertise in that). I don’t see how that problem will go away 
> with Flink – so still need to handle serialization.
> 
>  
> 
> 3)   Even if you do decide to move to Flink – I think you can do this 
> with one job, two jobs are not needed. At every incoming event, check the 
> previous state and update/output to kafka or whatever data store you are 
> using.
> 
>  
> 
>  
> 
> Thanks
> 
> Ankit
> 
>  
> 
> From: Flavio Pompermaier mailto:pomperma...@okkam.it>>
> Date: Tuesday, May 16, 2017 at 9:31 AM
> To: Kostas Kloudas  >
> Cc: user mailto:user@flink.apache.org>>
> Subject: Re: Stateful streaming question
> 
>  
> 
> Hi Kostas,
> 
> thanks for your quick response. 
> 
> I also thought about using Async IO, I just need to figure out how to 
> correctly handle parallelism and number of async requests. 
> 
> However that's probably the way to go..is it possible also to set a number of 
> retry attempts/backoff when the async request fails (maybe due to a too busy 
> server)?
> 
>  
> 
> For the second part I think it's ok to persist the state into RocksDB or 
> HDFS, my question is indeed about that: is it safe to start reading (with 
> another Flink job) from RocksDB or HDFS having an updatable state "pending" 
> on it? Should I ensure that state updates are not possible until the other 
> Flink job hasn't finish to read the persisted data?
> 
>  
> 
> And another question...I've tried to draft such a processand basically I have 
> the following code:
> 
>  
> 
> DataStream groupedObj = tuples.keyBy(0)
> 
> .flatMap(new RichFlatMapFunction() {
> 
>  
> 
>   private transient ValueState state;
> 
>  
> 
>   @Override
> 
>   public void flatMap(Tuple4 t, Collector out) throws 
> Exception {
> 
> MyGroupedObj current = state.value();
> 
> if (current == null) {
> 
>   current = new MyGroupedObj();
> 
> }
> 
> 
> 
>current.addTuple(t);
> 
> ... 
> 
> state.update(current);
> 
> o

Re: State in Custom Tumble Window Class

2017-05-17 Thread Timo Walther

Hi,

in general, a class level variable is not managed by Flink if it is not 
defined as state or the function does not implemented ListCheckpointed 
interface. Allowing infinite lateness also means that your window 
content has to be stored infinitely. I'm not sure if I understand your 
implementation correctly, but overriding classes that where not intended 
for overriding seems not to be a good solution. With Flink 1.3 there 
will be side outputs that allow to retrieve all late data as a stream 
for not losing elements at all: WindowedStream#sideOutputLateData, maybe 
this might be an option for you.


Hope that helps.

Timo


Am 16.05.17 um 23:25 schrieb rizhashmi:

i have requirement not to reject events .. even if they are late(after
maximum allowedness). So the way i achieve this but overriding Tumbling
window class and update event time to last event time if the event is late
and for identification attached additional column in db as
(currentevent/msPerHour - eventtime/msPerHour)

currentevent is a class level variable in Tumbling window.


My problem statement is, in case of full runtime crash, can flink recover
tumblewindow state?

if yes how?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-in-Custom-Tumble-Window-Class-tp13177.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: [BUG?] Cannot Load User Class on Local Environment

2017-05-17 Thread Till Rohrmann
Hi Matt,

alright, then we have to look into it again. I tried to run your example,
however, it does not seem to be self-contained. Using Ignite 2.0.0 with
-DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses
(it is recommended in production to specify at least one address in
TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP
finder configuration and make sure multicast works on your network.
Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till
​

On Mon, May 15, 2017 at 10:05 PM, Matt  wrote:

> Hi Till,
>
> I just tried with Flink 1.4 by compiling the current master branch on
> GitHub (as of this morning) and I still find the same problem as before. If
> I'm not wrong your PR was merged already, so your fixes should be part of
> the binary.
>
> I hope you have time to have a look at the test case in [1].
>
> Best,
> Matt
>
> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>
> On Thu, Apr 27, 2017 at 10:09 AM, Matt  wrote:
>
>> Hi Till,
>>
>> Great! Do you know if it's planned to be included in v1.2.x or should we
>> wait for v1.3? I'll give it a try as soon as it's merged.
>>
>> You're right about this approach launching a mini cluster on each Ignite
>> node. That is intentional, as described in my previous message on the list
>> [1].
>>
>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>> only processes the elements stored on the local in-memory database. I get
>> the impression this should be much faster than randomly picking a Flink
>> node and sending all the data over the network.
>>
>> Any insight on this?
>>
>> Cheers,
>> Matt
>>
>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-on-Ignite-Collocation-td12780.html
>>
>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann 
>> wrote:
>>
>>> I just copied my response because my other email address is not accepted
>>> on the user mailing list.
>>>
>>> Hi Matt,
>>>
>>> I think Stefan's analysis is correct. I have a PR open [1], where I fix
>>> the issue with the class loader.
>>>
>>> As a side note, by doing what you're doing, you will spawn on each
>>> Ignite node a new Flink mini cluster. These mini cluster won't communicate
>>> with each other and run independently. Is this what you intend to do?
>>>
>>> [1] https://github.com/apache/flink/pull/3781
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt  wrote:
>>>
 Let's wait for Till then, I hope he can figure this out.

 On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
 s.rich...@data-artisans.com> wrote:

> Ok, now the question is also about what classloaders Ignite is
> creating and how they are used, but the relevant code line in Flink is
> probably in FlinkMiniCluster.scala, line 538 (current master):
>
>  try {
>  JobClient.submitJobAndWait(
>clientActorSystem,
>configuration,
>leaderRetrievalService,
>jobGraph,
>timeout,
>printUpdates,
>this.getClass.getClassLoader())
> } finally {
>if(!useSingleActorSystem) {
>  // we have to shutdown the just created actor system
>  shutdownJobClientActorSystem(clientActorSystem)
>}
>  }
>
>
> This is what is executed as part of executing a job through
> LocalEnvironment. As we can see, the classloader is set to the classloader
> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
> this classloader might not know your user code. What you could do is
> changing this line in a custom Flink build, changing line 538 for example
> to Thread.currentThread().getContextClassloader() and ensuring that
> the context classloader ins the runnable is a classloader that a) knows 
> the
> user code and b) is a child of the classloader that knows the Ignite and
> Flink classes. Notice that this is not a general solution and should not
> become a general fix.
>
> I have heard that Till is about to change some things about local
> execution, so I included him in CC. Maybe he can provide additional hints
> how your use case might be better supported in the upcoming Flink 1.3.
>
> Best,
> Stefan
>
> Am 25.04.2017 um 22:50 schrieb Matt :
>
> I updated the code a little bit for clarity, now the line #56
> mentioned in my previous message is line #25.
>
> In summary the error I'm getting is this:
>
> ---
> Caused by: org.apache.flink.streaming.run
>

Re: Stateful streaming question

2017-05-17 Thread Fabian Hueske
Hi Ankit, just a brief comment on the batch job is easier than streaming
job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this
is only one part of the whole story. The operational side of using batch is
more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested
data, you need a periodic scheduler to kick of your batch job, and you need
to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit :

> Hi Flavio,
>
> While you wait on an update from Kostas, wanted to understand the use-case
> better and share my thoughts-
>
>
>
> 1)   Why is current batch mode expensive? Where are you persisting
> the data after updates? Way I see it by moving to Flink, you get to use
> RocksDB(a key-value store) that makes your lookups faster – probably right
> now you are using a non-indexed store like S3 maybe?
>
> So, gain is coming from moving to a better persistence store suited to
> your use-case than from batch->streaming. Myabe consider just going with a
> different data store.
>
> IMHO, stream should only be used if you really want to act on the new
> events in real-time. It is generally harder to get a streaming job correct
> than a batch one.
>
>
>
> 2)   If current setup is expensive due to
> serialization-deserialization then that should be fixed by moving to a
> faster format (maybe AVRO? - I don’t have a lot of expertise in that). I
> don’t see how that problem will go away with Flink – so still need to
> handle serialization.
>
>
>
> 3)   Even if you do decide to move to Flink – I think you can do this
> with one job, two jobs are not needed. At every incoming event, check the
> previous state and update/output to kafka or whatever data store you are
> using.
>
>
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Flavio Pompermaier 
> *Date: *Tuesday, May 16, 2017 at 9:31 AM
> *To: *Kostas Kloudas 
> *Cc: *user 
> *Subject: *Re: Stateful streaming question
>
>
>
> Hi Kostas,
>
> thanks for your quick response.
>
> I also thought about using Async IO, I just need to figure out how to
> correctly handle parallelism and number of async requests.
>
> However that's probably the way to go..is it possible also to set a number
> of retry attempts/backoff when the async request fails (maybe due to a too
> busy server)?
>
>
>
> For the second part I think it's ok to persist the state into RocksDB or
> HDFS, my question is indeed about that: is it safe to start reading (with
> another Flink job) from RocksDB or HDFS having an updatable state "pending"
> on it? Should I ensure that state updates are not possible until the other
> Flink job hasn't finish to read the persisted data?
>
>
>
> And another question...I've tried to draft such a processand basically I
> have the following code:
>
>
>
> DataStream groupedObj = tuples.keyBy(0)
>
> .flatMap(new RichFlatMapFunction() {
>
>
>
>   private transient ValueState state;
>
>
>
>   @Override
>
>   public void flatMap(Tuple4 t, Collector out)
> throws Exception {
>
> MyGroupedObj current = state.value();
>
> if (current == null) {
>
>   current = new MyGroupedObj();
>
> }
>
> 
>
>current.addTuple(t);
>
> ...
>
> state.update(current);
>
> out.collect(current);
>
>   }
>
>
>
>   @Override
>
>   public void open(Configuration config) {
>
> ValueStateDescriptor descriptor =
>
>   new ValueStateDescriptor<>(
> "test",TypeInformation.of(MyGroupedObj.class));
>
>   state = getRuntimeContext().getState(descriptor);
>
>   }
>
> });
>
> groupedObj.print();
>
>
>
> but obviously this way I emit the updated object on every update while,
> actually, I just want to persist the ValueState somehow (and make it
> available to another job that runs one/moth for example). Is that possible?
>
>
>
>
>
> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
> Hi Flavio,
>
>
>
> From what I understand, for the first part you are correct. You can use
> Flink’s internal state to keep your enriched data.
>
> In fact, if you are also querying an external system to enrich your data,
> it is worth looking at the AsyncIO feature:
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> asyncio.html
>
>
>
> Now for the second part, currently in Flink you cannot iterate over all
> registered keys for which you have state. A pointer
>
> to look at the may be useful is the queryable state:
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> queryable_state.html
>
>
>
> This is still an experimental feature, but let us know your opinion if you
> use it.
>
>
>
> Finally, an

Re: Using Kafka 0.10.x timestamps as a record value in Flink Streaming

2017-05-17 Thread Tzu-Li (Gordon) Tai
Hi Jia,

Actually just realized you can access the timestamp of records via the more 
powerful `ProcessFunction` [1].
That’ll be a bit easier to use than implementing your own custom operator, 
which is quite low-level.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiat...@gmail.com) wrote:

Hi Gordon,

The timestamps are required for application logic. Thank you for clarifying the 
custom operators - seems I mistakenly thought of the functions that are passed 
to the operators rather than the operators themselves. AbstractStreamOperator 
and the other classes you mentioned seem like exactly what I'm looking for, so 
I will take a look into implementing a custom one for my use case.

Thank you,
Jia

On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Jia,

How exactly do you want to use the Kafka timestamps? Do you want to access them 
and alter them with new values as the record timestamp? Or do you want to use 
them for some application logic in your functions?

If its the former, you should be able to do that by using timestamp / watermark 
extractors. They come with an interface that exposes the current timestamp of 
the record. For Kafka 0.10, that timestamp would be the Kafka record’s 
timestamp if it hasn’t been explicitly assigned any other timestamp yet.

If its the latter, then I think currently you have to use custom operators as 
Robert mentioned.
Custom operators are classes that extend the `AbstractStreamOperator` base 
class as one of `OneInputStreamOperator` or `TwoInputStreamOperator` interfaces.
You can take a look at the basic `StreamMap` or `StreamFlatMap` classes for an 
example, which are the underlying operators for the map and flatMap functions.

At the operator level, you’ll have access to a `StreamRecord` in the 
`processElement` function which wraps the record value (which you get when 
implementing functions) as well as the internal timestamp that comes with the 
record.

Cheers,
Gordon


On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiat...@gmail.com) wrote:

Hi Robert,

Thanks for the reply. I ended up implementing an extension of the Kafka fetcher 
and consumer so that the deserialization API can include the timestamp field, 
which is sufficient for my specific use case. I can share the code if desired 
but it seems like it's an intentional design decision not to expose the 
timestamp in the deserialization API.

I noticed you mentioned that I could use a custom operator to access the record 
event time. Could you elaborate on what you mean by "operator"? I initially 
thought that referred to DataStream.map/reduce/etc, but none of those functions 
provide arguments that can be used to extract the embedded timestamp. 

Thanks,
Jia Teoh

On Fri, May 12, 2017 at 9:25 AM, Robert Metzger  wrote:
Hi Jia,

The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but it is 
extensible / pluggable so that also the Kafka 0.9 Fetcher can read the event 
timestamps from Kafka 10.
We don't expose the timestamp through the deserilaization API, because we set 
it internally in Flink. (there is a "hidden" field with each record containing 
the event time of the event)

With a custom operator you can access the event time of a record.

On Fri, May 12, 2017 at 3:26 AM, Jia Teoh  wrote:
Hi,

Is there a way to retrieve the timestamps that Kafka associates with each 
key-value pair within Flink? I would like to be able to use these as values 
within my application flow, and defining them before or after Kafka is not 
acceptable for the use case due to the latency involved in sending or receiving 
from Kafka.

It seems that Flink supports Kafka event time (link) but after a brief trace it 
seems that KafkaConsumer010 still relies on the Kafka09Fetcher for iterating 
through each Kafka record and deserializing it. The KeyedDeserializationSchema 
api does not seem to have support for including timestamp as additional 
metadata (just offset, topic, and partition) so something such as 
JSONKeyValueDeserializationSchema will not return the Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka 
Connector (1.2.1).

Thanks,
Jia Teoh