Re: Kafka state backend?

2016-04-07 Thread Zach Cox
Ah I didn't see that, thanks for the link! Glad this is being discussed.

On Thu, Apr 7, 2016 at 5:06 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi Zach,
> I'm afraid someone already beat you to it :-)
> https://issues.apache.org/jira/browse/FLINK-3692
>
> In the issue we touch on some of the difficulties with this that stem from
> the differences in the guarantees that Flink and Samza try to give.
>
> Cheers,
> Aljoscha
>
> On Tue, 5 Apr 2016 at 22:24 Zach Cox <zcox...@gmail.com> wrote:
>
>> Hi - as clarified in another thread [1] stateful operators store all of
>> their current state in the backend on each checkpoint. Just curious if
>> Kafka topics with log compaction have ever been considered as a possible
>> state backend?
>>
>> Samza [2] uses RocksDB as a local state store, with all writes also going
>> to a log-compacted Kafka topic for persistence. This seems like it might
>> also be a good alternative backend in Flink for jobs with large amounts of
>> long-lasting state. You would give up some throughput (due to Kafka
>> producer writes) but there would be almost nothing to do on checkpoints.
>>
>> Just wanted to propose the idea and see if it has already been discussed,
>> or maybe I'm missing some reasons why it would be a bad idea.
>>
>> Thanks,
>> Zach
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html
>> [2]
>> http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza
>>
>>


Re: Back Pressure details

2016-04-06 Thread Zach Cox
Yeah I don't think that's the case for my setup either :)  I wrote a simple
Flink job that just consumes from Kafka and sinks events/sec rate to
Graphite. That consumes from Kafka several orders of magnitude higher than
the job that also sinks to Elasticsearch. As you said, the downstream back
pressure must be also slowing down consumption from Kafka, even though job
manager UI doesn't show HIGH back pressure on the Kafka source.

Thanks for the details!

-Zach


On Wed, Apr 6, 2016 at 2:37 PM Ufuk Celebi <u...@apache.org> wrote:

> Ah sorry, I forgot to mention that in the docs.
>
> The way that data is pulled from Kafka is bypassing Flink's task
> Thread. The topic is consumed in a separate Thread and the task Thread
> is just waiting. That's why you don't see any back pressure for Kafka
> sources. I would expect your Kafka source to be back pressured as well
> then.
>
> In theory it is possible that the speed at which data is consumed in
> the source "matches" the speed of the back pressured operator down
> stream. That would result in a non back pressured source with a down
> stream back pressured task. But I don't think that's the case for your
> setup. ;-)
>
> On Wed, Apr 6, 2016 at 9:27 PM, Zach Cox <zcox...@gmail.com> wrote:
> > The new back pressure docs are great, thanks Ufuk! I'm sure those will
> help
> > others as well.
> >
> > In the Source => A => B => Sink example, if A and B show HIGH back
> pressure,
> > should Source also show HIGH? In my case it's a Kafka source and
> > Elasticsearch sink. I know currently our Kafka can provide data at a much
> > higher rate than our Elasticsearch can ingest (I'm working on scaling up
> > Elasticsearch), just curious why the Kafka source wouldn't also show HIGH
> > back pressure.
> >
> > Thanks,
> > Zach
> >
> >
> > On Wed, Apr 6, 2016 at 5:36 AM Ufuk Celebi <u...@apache.org> wrote:
> >>
> >> Hey Zach,
> >>
> >> just added some documentation, which will be available in ~ 30 mins
> >> here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/back_pressure_monitoring.html
> >>
> >> If you think that something is missing there, I would appreciate some
> >> feedback. :-)
> >>
> >> Back pressure is determined by repeatedly calling getStackTrace() on
> >> the task Threads executing the job. By default, 100 times with 50ms
> >> delay between calls. If the task thread is stuck in an internal method
> >> call requesting buffers from the network stack, this indicates back
> >> pressure.
> >>
> >> The ratio you see tells you how many of the stack traces were stuck in
> >> that method (e.g. 1 out of 100) and the status codes just group those
> >> in a (hopefully) reasonable way (<= 0.10 is OK, <= 0.5 is LOW, > 0.5
> >> is HIGH).
> >>
> >> If you have a task with back pressure this means that it is producing
> >> data faster than the network can consume, for example because the
> >> downstream operator is slow or the network can't handle it. Your
> >> Source => A => B => Sink example suggests that the sink is slowing
> >> down/back pressuring B, which is in turn slowing down/back pressuring
> >> A.
> >>
> >> Does this help?
> >>
> >> Keep in mind though that it is not a rock solid approach and there is
> >> a chance that we miss the back pressure indicators or always sample
> >> when we the task is requesting buffers (which is happening all the
> >> time). It often works better at the extremes, e.g. when there is no
> >> back pressure at all or very high back pressure.
> >>
> >> – Ufuk
> >>
> >>
> >> On Tue, Apr 5, 2016 at 10:47 PM, Zach Cox <zcox...@gmail.com> wrote:
> >> > Hi - I'm trying to identify bottlenecks in my Flink streaming job, and
> >> > am
> >> > curious about the Back Pressure view in the job manager web UI. If
> there
> >> > are
> >> > already docs for Back Pressure please feel free to just point me to
> >> > those.
> >> > :)
> >> >
> >> > When "Sampling in progress..." is displayed, what exactly is
> happening?
> >> >
> >> > What do the values in the Ratio column for each Subtask mean exactly?
> >> >
> >> > What does Status such as OK, High, etc mean? Are these determined from
> >> > the
> >> > Ratio values?
> >> >
> >> > If my job graph looks like Source => A => B => Sink, with Back
> Pressure
> >> > OK
> >> > for Source and Sink, but High for A and B, what does that suggest?
> >> >
> >> > Thanks,
> >> > Zach
> >> >
>


Re: Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-06 Thread Zach Cox
Hi Stephan - incremental checkpointing sounds really interesting and
useful, I look forward to trying it out.

Thanks,
Zach


On Wed, Apr 6, 2016 at 4:39 AM Stephan Ewen <se...@apache.org> wrote:

> Hi Zach!
>
> I am working on incremental checkpointing, hope to have it in the master
> in the next weeks.
>
> The current approach is a to have a full self-contained checkpoint every
> once in a while, and have incremental checkpoints most of the time. Having
> a full checkpoint every now and then spares you from re-applying an endless
> set of deltas on recovery.
>
> Related to that is also making the checkpointing asynchronous, so that
> normal operations do not see any disruption any more.
>
> Greetings,
> Stephan
>
> On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <zcox...@gmail.com> wrote:
>
>> Thanks for the details Konstantin and Ufuk!
>>
>>
>> On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>>
>>> Hi Ufuk,
>>>
>>> I thought so, but I am not sure when and where ;) I will let you know,
>>> if I come across it again.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On 05.04.2016 21:10, Ufuk Celebi wrote:
>>> > Hey Zach and Konstantin,
>>> >
>>> > Great questions and answers. We can try to make this more explicit in
>>> the docs.
>>> >
>>> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
>>> > <konstantin.kn...@tngtech.com> wrote:
>>> >> To my knowledge flink takes care of deleting old checkpoints (I think
>>> it
>>> >> says so in the documentation about savepoints.). In my experience
>>> >> though, if a job is cancelled or crashes, the checkpoint files are
>>> >> usually not cleaned up. So some housekeeping might be necessary.
>>> >
>>> > Regarding cleanup: currently only the latest successful checkpoint is
>>> retained.
>>> >
>>> > On graceful shutdown, all checkpoints should be cleaned up as far as I
>>> > know. Savepoints always have to be cleaned up manually.
>>> >
>>> > On crashes, the checkpoint state has to be cleaned up manually (if the
>>> > JVM shut down hooks did not run).
>>> >
>>> > @Konstantin: did you have lingering state without crashes?
>>> >
>>> > – Ufuk
>>> >
>>>
>>> --
>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>
>


Re: Back Pressure details

2016-04-06 Thread Zach Cox
The new back pressure docs are great, thanks Ufuk! I'm sure those will help
others as well.

In the Source => A => B => Sink example, if A and B show HIGH back
pressure, should Source also show HIGH? In my case it's a Kafka source and
Elasticsearch sink. I know currently our Kafka can provide data at a much
higher rate than our Elasticsearch can ingest (I'm working on scaling up
Elasticsearch), just curious why the Kafka source wouldn't also show HIGH
back pressure.

Thanks,
Zach


On Wed, Apr 6, 2016 at 5:36 AM Ufuk Celebi <u...@apache.org> wrote:

> Hey Zach,
>
> just added some documentation, which will be available in ~ 30 mins
> here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/back_pressure_monitoring.html
>
> If you think that something is missing there, I would appreciate some
> feedback. :-)
>
> Back pressure is determined by repeatedly calling getStackTrace() on
> the task Threads executing the job. By default, 100 times with 50ms
> delay between calls. If the task thread is stuck in an internal method
> call requesting buffers from the network stack, this indicates back
> pressure.
>
> The ratio you see tells you how many of the stack traces were stuck in
> that method (e.g. 1 out of 100) and the status codes just group those
> in a (hopefully) reasonable way (<= 0.10 is OK, <= 0.5 is LOW, > 0.5
> is HIGH).
>
> If you have a task with back pressure this means that it is producing
> data faster than the network can consume, for example because the
> downstream operator is slow or the network can't handle it. Your
> Source => A => B => Sink example suggests that the sink is slowing
> down/back pressuring B, which is in turn slowing down/back pressuring
> A.
>
> Does this help?
>
> Keep in mind though that it is not a rock solid approach and there is
> a chance that we miss the back pressure indicators or always sample
> when we the task is requesting buffers (which is happening all the
> time). It often works better at the extremes, e.g. when there is no
> back pressure at all or very high back pressure.
>
> – Ufuk
>
>
> On Tue, Apr 5, 2016 at 10:47 PM, Zach Cox <zcox...@gmail.com> wrote:
> > Hi - I'm trying to identify bottlenecks in my Flink streaming job, and am
> > curious about the Back Pressure view in the job manager web UI. If there
> are
> > already docs for Back Pressure please feel free to just point me to
> those.
> > :)
> >
> > When "Sampling in progress..." is displayed, what exactly is happening?
> >
> > What do the values in the Ratio column for each Subtask mean exactly?
> >
> > What does Status such as OK, High, etc mean? Are these determined from
> the
> > Ratio values?
> >
> > If my job graph looks like Source => A => B => Sink, with Back Pressure
> OK
> > for Source and Sink, but High for A and B, what does that suggest?
> >
> > Thanks,
> > Zach
> >
>


Back Pressure details

2016-04-05 Thread Zach Cox
Hi - I'm trying to identify bottlenecks in my Flink streaming job, and am
curious about the Back Pressure view in the job manager web UI. If there
are already docs for Back Pressure please feel free to just point me to
those. :)

When "Sampling in progress..." is displayed, what exactly is happening?

What do the values in the Ratio column for each Subtask mean exactly?

What does Status such as OK, High, etc mean? Are these determined from the
Ratio values?

If my job graph looks like Source => A => B => Sink, with Back Pressure OK
for Source and Sink, but High for A and B, what does that suggest?

Thanks,
Zach


Kafka state backend?

2016-04-05 Thread Zach Cox
Hi - as clarified in another thread [1] stateful operators store all of
their current state in the backend on each checkpoint. Just curious if
Kafka topics with log compaction have ever been considered as a possible
state backend?

Samza [2] uses RocksDB as a local state store, with all writes also going
to a log-compacted Kafka topic for persistence. This seems like it might
also be a good alternative backend in Flink for jobs with large amounts of
long-lasting state. You would give up some throughput (due to Kafka
producer writes) but there would be almost nothing to do on checkpoints.

Just wanted to propose the idea and see if it has already been discussed,
or maybe I'm missing some reasons why it would be a bad idea.

Thanks,
Zach

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html
[2]
http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza


Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Zach Cox
Hi - I have some questions regarding Flink's checkpointing, specifically
related to storing state in the backends.

So let's say an operator in a streaming job is building up some state. When
it receives barriers from all of its input streams, does it store *all* of
its state to the backend? I think that is what the docs [1] and paper [2]
imply, but want to make sure. In other words, if the operator contains
100MB of state, and the backend is HDFS, does the operator copy all 100MB
of state to HDFS during the checkpoint?

Following on this example, say the operator is a global window and is
storing some state for each unique key observed in the stream of messages
(e.g. userId). Assume that over time, the number of observed unique keys
grows, so the size of the state also grows (the window state is never
purged). Is the entire operator state at the time of each checkpoint stored
to the backend? So that over time, the size of the state stored for each
checkpoint to the backend grows? Or is the state stored to the backend
somehow just the state that changed in some way since the last checkpoint?

Are old checkpoint states in the backend ever deleted / cleaned up? That
is, if all of the state for checkpoint n in the backend is all that is
needed to restore a failed job, then all state for all checkpoints m < n
should not be needed any more, right? Can all of those old checkpoints be
deleted from the backend? Does Flink do this?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
[2] http://arxiv.org/abs/1506.08603


Re: Upserts with Flink-elasticsearch

2016-03-28 Thread Zach Cox
Hi Madhukar - with the current Elasticsearch sink in Flink 1.0.0 [1], I
don't think an upsert is possible, since IndexRequestBuilder can only
return an IndexRequest.

In Flink 1.1, the Elasticsearch 2.x sink [2] provides a RequestIndexer [3]
that you can pass an UpdateRequest to do an upsert.

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/connectors/elasticsearch.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html
[3]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.html


On Mon, Mar 28, 2016 at 2:18 PM Madhukar Thota 
wrote:

> Is it possible to do Upsert with existing flink-elasticsearch connector
> today?
>


Accumulators checkpointed?

2016-03-15 Thread Zach Cox
Are accumulators stored in checkpoint state? If a job fails and restarts,
are all accumulator values lost, or are they restored from checkpointed
state?

Thanks,
Zach


Re: asm IllegalArgumentException with 1.0.0

2016-03-14 Thread Zach Cox
Yes pretty much - we use sbt to run the job in a local environment, not
Intellij, but should be the same thing. We were also seeing that exception
running unit tests locally. We did not see the exception when assembling a
fat jar and submitting to a remote Flink cluster.

It seems like the flink-connector-elasticsearch jar should not have shaded
classes in it. Maybe that jar in maven central was built incorrectly?

We worked around this by just not depending on that elasticsearch connector
at all, since we wrote our own connector for Elasticsearch 2.x.

-Zach


On Mon, Mar 14, 2016 at 2:03 PM Andrew Whitaker <
andrew.whita...@braintreepayments.com> wrote:

> We're having the same issue (we also have a dependency on
> flink-connector-elasticsearch). It's only happening to us in IntelliJ
> though. Is this the case for you as well?
>
> On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox <zcox...@gmail.com> wrote:
>
>> After some poking around I noticed
>> that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
>> classes. If I remove that dependency from my project then I do not get the
>> IllegalArgumentException.
>>
>>
>> On Thu, Mar 10, 2016 at 11:51 AM Zach Cox <zcox...@gmail.com> wrote:
>>
>>> Here are the jars on the classpath when I try to run our Flink job in a
>>> local environment (via `sbt run`):
>>>
>>>
>>> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt
>>>
>>> There are many transitive dependencies pulled in from internal library
>>> projects that probably need to be cleaned out. Maybe we are including
>>> something that conflicts? Or maybe something important is being excluded?
>>>
>>> Are the asm classes included in Flink jars in some shaded form?
>>>
>>> Thanks,
>>> Zach
>>>
>>>
>>> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Dependency shading changed a bit between RC4 and RC5 - maybe a
>>>> different minor ASM version is now included in the "test" scope.
>>>>
>>>> Can you share the dependencies of the problematic project?
>>>>
>>>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote:
>>>>
>>>>> I also noticed when I try to run this application in a local
>>>>> environment, I get the same IllegalArgumentException.
>>>>>
>>>>> When I assemble this application into a fat jar and run it on a Flink
>>>>> cluster using the CLI tools, it seems to run fine.
>>>>>
>>>>> Maybe my local classpath is missing something that is provided on the
>>>>> Flink task managers?
>>>>>
>>>>> -Zach
>>>>>
>>>>>
>>>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote:
>>>>>
>>>>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a
>>>>>> unit test:
>>>>>>
>>>>>>IllegalArgumentException:   (null:-1)
>>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>>> Source)
>>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>>> Source)
>>>>>>
>>>>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>>>>
>>>>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>>>>
>>>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>>>>
>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>>>>
>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>>>>
>>>>>> The line that causes that exception is just adding
>>>>>> a FlinkKafkaConsumer08 source.
>>>>>>
>>>>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it
>>>>>> is not given a valid api version number, but InnerClosureFinder [2] looks
>>>>>> fine to me.
>>>>>>
>>>>>> Any idea what might be causing this? This unit test worked fine with
>>>>>> 1.0.0-rc0 jars.
>>>>>>
>>>>>> Thanks,
>>>>>> Zach
>>>>>>
>>>>>> [1]
>>>>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>>>>> [2]
>>>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>>>>
>>>>>>
>>>>>>
>>>>
>
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>


Re: asm IllegalArgumentException with 1.0.0

2016-03-10 Thread Zach Cox
After some poking around I noticed
that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
classes. If I remove that dependency from my project then I do not get the
IllegalArgumentException.


On Thu, Mar 10, 2016 at 11:51 AM Zach Cox <zcox...@gmail.com> wrote:

> Here are the jars on the classpath when I try to run our Flink job in a
> local environment (via `sbt run`):
>
>
> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt
>
> There are many transitive dependencies pulled in from internal library
> projects that probably need to be cleaned out. Maybe we are including
> something that conflicts? Or maybe something important is being excluded?
>
> Are the asm classes included in Flink jars in some shaded form?
>
> Thanks,
> Zach
>
>
> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote:
>
>> Dependency shading changed a bit between RC4 and RC5 - maybe a different
>> minor ASM version is now included in the "test" scope.
>>
>> Can you share the dependencies of the problematic project?
>>
>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote:
>>
>>> I also noticed when I try to run this application in a local
>>> environment, I get the same IllegalArgumentException.
>>>
>>> When I assemble this application into a fat jar and run it on a Flink
>>> cluster using the CLI tools, it seems to run fine.
>>>
>>> Maybe my local classpath is missing something that is provided on the
>>> Flink task managers?
>>>
>>> -Zach
>>>
>>>
>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote:
>>>
>>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit
>>>> test:
>>>>
>>>>IllegalArgumentException:   (null:-1)
>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>> Source)
>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>> Source)
>>>>
>>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>>
>>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>>
>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>>
>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>>
>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>>
>>>> The line that causes that exception is just adding
>>>> a FlinkKafkaConsumer08 source.
>>>>
>>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it
>>>> is not given a valid api version number, but InnerClosureFinder [2] looks
>>>> fine to me.
>>>>
>>>> Any idea what might be causing this? This unit test worked fine with
>>>> 1.0.0-rc0 jars.
>>>>
>>>> Thanks,
>>>> Zach
>>>>
>>>> [1]
>>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>>
>>>>
>>>>
>>


Re: asm IllegalArgumentException with 1.0.0

2016-03-10 Thread Zach Cox
Here are the jars on the classpath when I try to run our Flink job in a
local environment (via `sbt run`):

https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt

There are many transitive dependencies pulled in from internal library
projects that probably need to be cleaned out. Maybe we are including
something that conflicts? Or maybe something important is being excluded?

Are the asm classes included in Flink jars in some shaded form?

Thanks,
Zach


On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote:

> Dependency shading changed a bit between RC4 and RC5 - maybe a different
> minor ASM version is now included in the "test" scope.
>
> Can you share the dependencies of the problematic project?
>
> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote:
>
>> I also noticed when I try to run this application in a local environment,
>> I get the same IllegalArgumentException.
>>
>> When I assemble this application into a fat jar and run it on a Flink
>> cluster using the CLI tools, it seems to run fine.
>>
>> Maybe my local classpath is missing something that is provided on the
>> Flink task managers?
>>
>> -Zach
>>
>>
>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote:
>>
>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit
>>> test:
>>>
>>>IllegalArgumentException:   (null:-1)
>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>> Source)
>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>> Source)
>>>
>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>
>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>
>>> The line that causes that exception is just adding
>>> a FlinkKafkaConsumer08 source.
>>>
>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it is
>>> not given a valid api version number, but InnerClosureFinder [2] looks fine
>>> to me.
>>>
>>> Any idea what might be causing this? This unit test worked fine with
>>> 1.0.0-rc0 jars.
>>>
>>> Thanks,
>>> Zach
>>>
>>> [1]
>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>
>>>
>>>
>


Re: ExecutionConfig.enableTimestamps() in 1.0.0

2016-03-10 Thread Zach Cox
I see the new Event Time docs page, thanks for fixing that! I like the
additional explanation of event time and watermarks.

I also updated our TimestampExtractors to AssignerWithPeriodicWatermarks as
described in [1]. I like the separation between periodic and punctuated
watermark assigners in the new API - it's definitely more clear how each
one operates.

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_timestamps_watermarks.html


On Thu, Mar 10, 2016 at 3:33 AM Ufuk Celebi <u...@apache.org> wrote:

> Just removed the page. Triggering a new docs build...
>
> On Thu, Mar 10, 2016 at 10:22 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Then Stephan should have removed the old doc when adding the new one… :-)
> >> On 10 Mar 2016, at 10:20, Ufuk Celebi <u...@apache.org> wrote:
> >>
> >> Just talked with Stephan: the document you are referring to is stale.
> >> Can you check out this one here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
> >>
> >> – Ufuk
> >>
> >>
> >> On Thu, Mar 10, 2016 at 10:17 AM, Ufuk Celebi <u...@apache.org> wrote:
> >>> I've added this to the migration guide here:
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
> >>>
> >>> Feel free to add any other API changes that are missing there.
> >>>
> >>> – Ufuk
> >>>
> >>>
> >>> On Thu, Mar 10, 2016 at 10:13 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
> >>>> Hi,
> >>>> you’re right, this should be changed to
> “setStreamTimeCharacteristic(EventTime)” in the doc.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 09 Mar 2016, at 23:21, Zach Cox <zcox...@gmail.com> wrote:
> >>>>>
> >>>>> Hi - I'm upgrading our app to 1.0.0 and noticed ExecutionConfig no
> longer has an enableTimestamps() method. Do we just not need to call that
> at all now?
> >>>>>
> >>>>> The docs still say to call it [1] - do they just need to be updated?
> >>>>>
> >>>>> Thanks,
> >>>>> Zach
> >>>>>
> >>>>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/time.html
> >>>>>
> >>>>
> >
>


Re: asm IllegalArgumentException with 1.0.0

2016-03-09 Thread Zach Cox
I also noticed when I try to run this application in a local environment, I
get the same IllegalArgumentException.

When I assemble this application into a fat jar and run it on a Flink
cluster using the CLI tools, it seems to run fine.

Maybe my local classpath is missing something that is provided on the Flink
task managers?

-Zach


On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote:

> Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit
> test:
>
>IllegalArgumentException:   (null:-1)
> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
> Source)
> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
> Source)
>
> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>
> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>
> The line that causes that exception is just adding a FlinkKafkaConsumer08
> source.
>
> ClassVisitor [1] seems to throw that IllegalArgumentException when it is
> not given a valid api version number, but InnerClosureFinder [2] looks fine
> to me.
>
> Any idea what might be causing this? This unit test worked fine with
> 1.0.0-rc0 jars.
>
> Thanks,
> Zach
>
> [1]
> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
> [2]
> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>
>
>


asm IllegalArgumentException with 1.0.0

2016-03-09 Thread Zach Cox
Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit
test:

   IllegalArgumentException:   (null:-1)
org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
Source)
org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
Source)
org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)

The line that causes that exception is just adding a FlinkKafkaConsumer08
source.

ClassVisitor [1] seems to throw that IllegalArgumentException when it is
not given a valid api version number, but InnerClosureFinder [2] looks fine
to me.

Any idea what might be causing this? This unit test worked fine with
1.0.0-rc0 jars.

Thanks,
Zach

[1]
http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
[2]
https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279


ExecutionConfig.enableTimestamps() in 1.0.0

2016-03-09 Thread Zach Cox
Hi - I'm upgrading our app to 1.0.0 and noticed ExecutionConfig no longer
has an enableTimestamps() method. Do we just not need to call that at all
now?

The docs still say to call it [1] - do they just need to be updated?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/time.html


Re: InetSocketAddress is not serializable

2016-03-04 Thread Zach Cox
I ran into the same issue upgrading to Elasticsearch 2, here's how I solved
it:

https://gist.github.com/zcox/59e486be7aeeca381be0#file-elasticsearch2sink-java-L110

-Zach


On Fri, Mar 4, 2016 at 7:30 AM HungChang  wrote:

> Hi,
>
> I'm building the connector for ElasticSearch2. One main issue for me now is
> that
>
> List transports = new ArrayList();
> transports.add(new InetSocketTransportAddress(new
> InetSocketAddress(TransportAddress, 9300)));
>
> throws
> java.io.NotSerializableException:
> org.elasticsearch.common.transport.InetSocketTransportAddress
>
> ES2 changed to this way and InetSocketAddress implements
> java.io.Serializable rather than Writable (is it the problem?).
> What would be the suggested way to handle this? should I implement a
> serializable InetSocketAddress?
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/InetSocketAddress-is-not-serializable-tp5296.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Watermarks with repartition

2016-02-26 Thread Zach Cox
Thanks for the confirmation Aljoscha! I wrote up results from my little
experiment: https://github.com/zcox/flink-repartition-watermark-example

-Zach


On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> yes, your description is spot on!
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 00:19, Zach Cox <zcox...@gmail.com> wrote:
> >
> > I think I found the information I was looking for:
> >
> > RecordWriter broadcasts each emitted watermark to all outgoing channels
> [1].
> >
> > StreamInputProcessor tracks the max watermark received on each incoming
> channel separately, and computes the task's watermark as the min of all
> incoming watermarks [2].
> >
> > Is this an accurate summary of Flink's watermark propagation?
> >
> > So in my previous example, each window count task is building up a count
> for each window based on incoming event's timestamp, and when all incoming
> watermarks have progressed beyond the end of the window, the count is
> emitted. So if one partition's watermark lags behind the other, it just
> means the window output is triggered based on this lagging watermark.
> >
> > -Zach
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> >
> >
> > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote:
> > Hi - how are watermarks passed along parallel tasks where there is a
> repartition? For example, say I have a simple streaming job computing
> hourly counts per key, something like this:
> >
> > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > environment.setParallelism(2)
> > environment.setStreamTimeCharacteristic(EventTime)
> > environment.getConfig.enableTimestamps()
> > environment
> >   .addSource(...)
> >   .assignAscendingTimestamps(_.timestamp)
> >   .keyBy("someField")
> >   .timeWindow(Time.hours(1))
> >   .fold(0, (count, element) => count + 1)
> >   .addSink(...)
> > environment.execute("example")
> >
> > Say the source has 2 parallel partitions (e.g. Kafka topic) and the
> events from the source contain timestamps, but over time the 2 source tasks
> diverge in event time (maybe 1 Kafka topic partition has many more events
> than the other).
> >
> > The job graph looks like this: http://imgur.com/hxEpF6b
> >
> > From what I can tell, the execution graph, with parallelism=2, would
> look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash
> partition to be used, so that events with the same key end up at the same
> window subtask, regardless of which source partition they came from.
> >
> > Since the watermarks are skewed between the parallel pipelines, what
> happens when differing watermarks are sent to the window count operators?
> Is something tracking the min incoming watermark there? Could anyone point
> me to Flink code that implements this? I'd really like to learn more about
> how this works.
> >
> > Thanks,
> > Zach
> >
> >
>
>


Re: Watermarks with repartition

2016-02-25 Thread Zach Cox
I think I found the information I was looking for:

RecordWriter broadcasts each emitted watermark to all outgoing channels [1].

StreamInputProcessor tracks the max watermark received on each incoming
channel separately, and computes the task's watermark as the min of all
incoming watermarks [2].

Is this an accurate summary of Flink's watermark propagation?

So in my previous example, each window count task is building up a count
for each window based on incoming event's timestamp, and when all incoming
watermarks have progressed beyond the end of the window, the count is
emitted. So if one partition's watermark lags behind the other, it just
means the window output is triggered based on this lagging watermark.

-Zach

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147


On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote:

> Hi - how are watermarks passed along parallel tasks where there is a
> repartition? For example, say I have a simple streaming job computing
> hourly counts per key, something like this:
>
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
>
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
> from the source contain timestamps, but over time the 2 source tasks
> diverge in event time (maybe 1 Kafka topic partition has many more events
> than the other).
>
> The job graph looks like this: http://imgur.com/hxEpF6b
>
> From what I can tell, the execution graph, with parallelism=2, would look
> like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
> be used, so that events with the same key end up at the same window
> subtask, regardless of which source partition they came from.
>
> Since the watermarks are skewed between the parallel pipelines, what
> happens when differing watermarks are sent to the window count operators?
> Is something tracking the min incoming watermark there? Could anyone point
> me to Flink code that implements this? I'd really like to learn more about
> how this works.
>
> Thanks,
> Zach
>
>
>


Watermarks with repartition

2016-02-25 Thread Zach Cox
Hi - how are watermarks passed along parallel tasks where there is a
repartition? For example, say I have a simple streaming job computing
hourly counts per key, something like this:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(2)
environment.setStreamTimeCharacteristic(EventTime)
environment.getConfig.enableTimestamps()
environment
  .addSource(...)
  .assignAscendingTimestamps(_.timestamp)
  .keyBy("someField")
  .timeWindow(Time.hours(1))
  .fold(0, (count, element) => count + 1)
  .addSink(...)
environment.execute("example")

Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
from the source contain timestamps, but over time the 2 source tasks
diverge in event time (maybe 1 Kafka topic partition has many more events
than the other).

The job graph looks like this: http://imgur.com/hxEpF6b

>From what I can tell, the execution graph, with parallelism=2, would look
like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
be used, so that events with the same key end up at the same window
subtask, regardless of which source partition they came from.

Since the watermarks are skewed between the parallel pipelines, what
happens when differing watermarks are sent to the window count operators?
Is something tracking the min incoming watermark there? Could anyone point
me to Flink code that implements this? I'd really like to learn more about
how this works.

Thanks,
Zach


Re:

2016-02-23 Thread Zach Cox
Hi Ufuk - here is the jira issue with the requested information:
https://issues.apache.org/jira/browse/FLINK-3483

-Zach


On Tue, Feb 23, 2016 at 8:59 AM Ufuk Celebi <u...@apache.org> wrote:

> Hey Zach! I'm not aware of an open issue for this.
>
> You can go ahead and open an issue for it. It will be very helpful to
> include the following:
> - exact Chrome and OS X version
> - the exectuion plan as JSON (via env.getExecutionPlan())
> - screenshot
>
> Thanks!
>
> – Ufuk
>
>
> On Tue, Feb 23, 2016 at 3:46 PM, Zach Cox <zcox...@gmail.com> wrote:
> > Hi - I typically use the Chrome browser on OS X, and notice that with
> > 1.0.0-rc0 the job graph visualization displays the nodes in the graph,
> but
> > not any of the edges. Also the graph does not move around when dragging
> the
> > mouse.
> >
> > The job graph visualization seems to work perfectly in Safari and
> Firefox on
> > OS X, however.
> >
> > Is this a known issue or should I open a jira ticket?
> >
> > Thanks,
> > Zach
> >
>


[no subject]

2016-02-23 Thread Zach Cox
Hi - I typically use the Chrome browser on OS X, and notice that with
1.0.0-rc0 the job graph visualization displays the nodes in the graph, but
not any of the edges. Also the graph does not move around when dragging the
mouse.

The job graph visualization seems to work perfectly in Safari and Firefox
on OS X, however.

Is this a known issue or should I open a jira ticket?

Thanks,
Zach


Re: Using numberOfTaskSlots to control parallelism

2016-02-20 Thread Zach Cox
Thanks for the input Aljoscha and Ufuk! I will try out the #2 approach and
report back.

Thanks,
Zach


On Sat, Feb 20, 2016 at 7:26 AM Ufuk Celebi  wrote:

> On Sat, Feb 20, 2016 at 10:12 AM, Aljoscha Krettek 
> wrote:
> > IMHO the only change for 2) is that you possibly get better machine
> utilization because it will use more parallel threads.  So I think it’s a
> valid approach.
> >
> > @Ufuk, could there be problems with the number of network buffers? I
> think not, because the connections are multiplexed in one channel, is this
> correct?
>
> I would not expect it to become a problem. If it does, it's easy to
> resolve by throwing a little more memory at the problem. [1]
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>


Using numberOfTaskSlots to control parallelism

2016-02-19 Thread Zach Cox
What would the differences be between these scenarios?

1) one task manager with numberOfTaskSlots=1 and one job with parallelism=1

2) one task manager with numberOfTaskSlots=10 and one job with
parallelism=10

In both cases all of the job's tasks get executed within the one task
manager's jvm. Are there any downsides to doing #2 instead of #1?

I ask this question because of current issues related to watermarks with
Kafka sources [1] [2] and changing parallelism with savepoints [3]. I'm
writing a Flink job that processes events from Kafka topics that have 12
partitions. I'm wondering if I should just set the job parallelism=12 and
make numberOfTaskSlots sum to 12 across however many task managers I set
up. It seems like watermarks would work properly then, and I could
effectively change job parallelism using the number of task managers (e.g.
1 TM with slots=12, or 2 TMs with slots=6, or 12 TMs with slots=1, etc).

Am I missing any important details that would make this a bad idea? It
seems like a bit of abuse of numberOfTaskSlots, but also seems like a
fairly simple solution to a few current issues.

Thanks,
Zach

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tt4782.html
[2] https://issues.apache.org/jira/browse/FLINK-3375
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-parallelism-tt4967.html


Re: Changing parallelism

2016-02-18 Thread Zach Cox
Hi Ufuk - thanks for the 2016 roadmap - glad to see changing parallelism is
the first bullet :)  Mesos support also sounds great, we're currently
running job and task managers on Mesos statically via Marathon.

Hi Stephan - thanks, that trick sounds pretty clever, I will try wrapping
my head around using 2 different jobs and uids like that.

-Zach


On Thu, Feb 18, 2016 at 7:13 AM Stephan Ewen <se...@apache.org> wrote:

> Hi Zach!
>
> Yes, changing parallelism is pretty high up the priority list. The good
> news is that "scaling in" is the simpler part of changing the parallelism
> and we are pushing to get that in soon.
>
>
> Until then, there is only a pretty ugly trick that you can do right now to
> "rescale' the state:
>
>   1) savepoint with high parallelism
>
>   2) run an intermediate job that has the state twice in two operators,
> once with high parallelism, once with low. Emit the state from the first
> operator, write in the second. The first operator has the operator ID of
> the initial high-parallelism state.
>
>   3) Run the low parallelism job, and the stateful operator needs the ID
> of the second (low parallelism) operator in the intermediate job.
>
>
> Greetings,
> Stephan
>
>
> On Thu, Feb 18, 2016 at 9:24 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> Hey Zach!
>>
>> Sounds like a great use case.
>>
>> On Wed, Feb 17, 2016 at 3:16 PM, Zach Cox <zcox...@gmail.com> wrote:
>> > However, the savepoint docs state that the job parallelism cannot be
>> changed
>> > over time [1]. Does this mean we need to use the same, fixed
>> parallelism=n
>> > during reprocessing and going forward? Are there any tricks or
>> workarounds
>> > we could use to still make changes to parallelism and take advantage of
>> > savepoints?
>>
>> Yes, currently you have to keep the parallelism fixed. Dynamic scale
>> in and out of programs will have very high priority after the 1.0
>> release [1]. Unfortunately, I'm not aware of any work arounds to
>> overcome this at the moment.
>>
>> – Ufuk
>>
>> [1] https://flink.apache.org/news/2015/12/18/a-year-in-review.html (at
>> the end of the post there is a road map for 2016)
>>
>
>


Re: Availability for the ElasticSearch 2 streaming connector

2016-02-18 Thread Zach Cox
Awesome, thanks Suneel. :D

I made the changes to support our use case, which needed flatMap behavior
(index 2 docs, or zero docs, per incoming element) instead of map, and we
also need to make either IndexRequest or UpdateRequest depending on the
element.

-Zach


On Thu, Feb 18, 2016 at 2:06 AM Suneel Marthi <smar...@apache.org> wrote:

> Thanks Zach, I have a few minor changes too locally; I'll push a PR out
> tomorrow that has ur changes too.
>
> On Wed, Feb 17, 2016 at 5:13 PM, Zach Cox <zcox...@gmail.com> wrote:
>
>> I recently did exactly what Robert described: I copied the code from this
>> (closed) PR https://github.com/apache/flink/pull/1479, modified it a
>> bit, and just included it in my own project that uses the Elasticsearch 2
>> java api. Seems to work well. Here are the files so you can do the same:
>>
>> https://gist.github.com/zcox/59e486be7aeeca381be0
>>
>> -Zach
>>
>>
>> On Wed, Feb 17, 2016 at 4:06 PM Suneel Marthi <suneel.mar...@gmail.com>
>> wrote:
>>
>>> Hey I missed this thread, sorry about that.
>>>
>>> I have a basic connector working with ES 2.0 which I can push out.  Its
>>> not optimized yet and I don't have the time to look at it, if someone would
>>> like to take it over go ahead I can send a PR.
>>>
>>> On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Mihail,
>>>>
>>>> It seems that nobody is actively working on the elasticsearch2
>>>> connector right now. The 1.0.0 release is already feature frozen, only bug
>>>> fixes or (some) pending pull requests go in.
>>>>
>>>> What you can always do is copy the code from our current elasticsearch
>>>> connector, set the dependency to the version you would like to use and
>>>> adopt our code to their API changes. I think it might take not much time to
>>>> get it working.
>>>> (The reason why we usually need more time for stuff like this are
>>>> integration tests and documentation).
>>>>
>>>> Please let me know if that solution doesn't work for you.
>>>>
>>>> Regards,
>>>> Robert
>>>>
>>>>
>>>> On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail <mihail.vi...@zalando.de
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> in reference to this ticket
>>>>> https://issues.apache.org/jira/browse/FLINK-3115 when do you think
>>>>> that an ElasticSearch 2 streaming connector will become available? Will it
>>>>> make it for the 1.0 release?
>>>>>
>>>>> That would be great, as we are planning to use that particular version
>>>>> of ElasticSearch in the very near future.
>>>>>
>>>>> Best regards,
>>>>> Mihail
>>>>>
>>>>
>>>>
>>>
>


Re: Availability for the ElasticSearch 2 streaming connector

2016-02-17 Thread Zach Cox
I recently did exactly what Robert described: I copied the code from this
(closed) PR https://github.com/apache/flink/pull/1479, modified it a bit,
and just included it in my own project that uses the Elasticsearch 2 java
api. Seems to work well. Here are the files so you can do the same:

https://gist.github.com/zcox/59e486be7aeeca381be0

-Zach


On Wed, Feb 17, 2016 at 4:06 PM Suneel Marthi 
wrote:

> Hey I missed this thread, sorry about that.
>
> I have a basic connector working with ES 2.0 which I can push out.  Its
> not optimized yet and I don't have the time to look at it, if someone would
> like to take it over go ahead I can send a PR.
>
> On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger 
> wrote:
>
>> Hi Mihail,
>>
>> It seems that nobody is actively working on the elasticsearch2 connector
>> right now. The 1.0.0 release is already feature frozen, only bug fixes or
>> (some) pending pull requests go in.
>>
>> What you can always do is copy the code from our current elasticsearch
>> connector, set the dependency to the version you would like to use and
>> adopt our code to their API changes. I think it might take not much time to
>> get it working.
>> (The reason why we usually need more time for stuff like this are
>> integration tests and documentation).
>>
>> Please let me know if that solution doesn't work for you.
>>
>> Regards,
>> Robert
>>
>>
>> On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail 
>> wrote:
>>
>>> Hi,
>>>
>>> in reference to this ticket
>>> https://issues.apache.org/jira/browse/FLINK-3115 when do you think that
>>> an ElasticSearch 2 streaming connector will become available? Will it make
>>> it for the 1.0 release?
>>>
>>> That would be great, as we are planning to use that particular version
>>> of ElasticSearch in the very near future.
>>>
>>> Best regards,
>>> Mihail
>>>
>>
>>
>


Changing parallelism

2016-02-17 Thread Zach Cox
Hi - we are building a stateful Flink streaming job that will run
indefinitely. One part of the job builds up state per key in a global
window that will need to exist for a very long time. We will definitely be
using the savepoints to restore job state after new code deploys.

We were planning to be able to increase the parallelism of the job
incrementally over time, as the volume of input data grows. We also have a
large amount of historical data loaded into Kafka we'd like to reprocess
initially with the streaming job to backfill Elasticsearch, and then
transition the job seamlessly to nearline processing. We were planning to
use a large parallelism during the historical reprocessing, and then
decrease it when the job has caught up to new events.

However, the savepoint docs state that the job parallelism cannot be
changed over time [1]. Does this mean we need to use the same, fixed
parallelism=n during reprocessing and going forward? Are there any tricks
or workarounds we could use to still make changes to parallelism and take
advantage of savepoints?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html#current-limitations