Re: Kafka state backend?
Ah I didn't see that, thanks for the link! Glad this is being discussed. On Thu, Apr 7, 2016 at 5:06 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Zach, > I'm afraid someone already beat you to it :-) > https://issues.apache.org/jira/browse/FLINK-3692 > > In the issue we touch on some of the difficulties with this that stem from > the differences in the guarantees that Flink and Samza try to give. > > Cheers, > Aljoscha > > On Tue, 5 Apr 2016 at 22:24 Zach Cox <zcox...@gmail.com> wrote: > >> Hi - as clarified in another thread [1] stateful operators store all of >> their current state in the backend on each checkpoint. Just curious if >> Kafka topics with log compaction have ever been considered as a possible >> state backend? >> >> Samza [2] uses RocksDB as a local state store, with all writes also going >> to a log-compacted Kafka topic for persistence. This seems like it might >> also be a good alternative backend in Flink for jobs with large amounts of >> long-lasting state. You would give up some throughput (due to Kafka >> producer writes) but there would be almost nothing to do on checkpoints. >> >> Just wanted to propose the idea and see if it has already been discussed, >> or maybe I'm missing some reasons why it would be a bad idea. >> >> Thanks, >> Zach >> >> [1] >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html >> [2] >> http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza >> >>
Re: Back Pressure details
Yeah I don't think that's the case for my setup either :) I wrote a simple Flink job that just consumes from Kafka and sinks events/sec rate to Graphite. That consumes from Kafka several orders of magnitude higher than the job that also sinks to Elasticsearch. As you said, the downstream back pressure must be also slowing down consumption from Kafka, even though job manager UI doesn't show HIGH back pressure on the Kafka source. Thanks for the details! -Zach On Wed, Apr 6, 2016 at 2:37 PM Ufuk Celebi <u...@apache.org> wrote: > Ah sorry, I forgot to mention that in the docs. > > The way that data is pulled from Kafka is bypassing Flink's task > Thread. The topic is consumed in a separate Thread and the task Thread > is just waiting. That's why you don't see any back pressure for Kafka > sources. I would expect your Kafka source to be back pressured as well > then. > > In theory it is possible that the speed at which data is consumed in > the source "matches" the speed of the back pressured operator down > stream. That would result in a non back pressured source with a down > stream back pressured task. But I don't think that's the case for your > setup. ;-) > > On Wed, Apr 6, 2016 at 9:27 PM, Zach Cox <zcox...@gmail.com> wrote: > > The new back pressure docs are great, thanks Ufuk! I'm sure those will > help > > others as well. > > > > In the Source => A => B => Sink example, if A and B show HIGH back > pressure, > > should Source also show HIGH? In my case it's a Kafka source and > > Elasticsearch sink. I know currently our Kafka can provide data at a much > > higher rate than our Elasticsearch can ingest (I'm working on scaling up > > Elasticsearch), just curious why the Kafka source wouldn't also show HIGH > > back pressure. > > > > Thanks, > > Zach > > > > > > On Wed, Apr 6, 2016 at 5:36 AM Ufuk Celebi <u...@apache.org> wrote: > >> > >> Hey Zach, > >> > >> just added some documentation, which will be available in ~ 30 mins > >> here: > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/back_pressure_monitoring.html > >> > >> If you think that something is missing there, I would appreciate some > >> feedback. :-) > >> > >> Back pressure is determined by repeatedly calling getStackTrace() on > >> the task Threads executing the job. By default, 100 times with 50ms > >> delay between calls. If the task thread is stuck in an internal method > >> call requesting buffers from the network stack, this indicates back > >> pressure. > >> > >> The ratio you see tells you how many of the stack traces were stuck in > >> that method (e.g. 1 out of 100) and the status codes just group those > >> in a (hopefully) reasonable way (<= 0.10 is OK, <= 0.5 is LOW, > 0.5 > >> is HIGH). > >> > >> If you have a task with back pressure this means that it is producing > >> data faster than the network can consume, for example because the > >> downstream operator is slow or the network can't handle it. Your > >> Source => A => B => Sink example suggests that the sink is slowing > >> down/back pressuring B, which is in turn slowing down/back pressuring > >> A. > >> > >> Does this help? > >> > >> Keep in mind though that it is not a rock solid approach and there is > >> a chance that we miss the back pressure indicators or always sample > >> when we the task is requesting buffers (which is happening all the > >> time). It often works better at the extremes, e.g. when there is no > >> back pressure at all or very high back pressure. > >> > >> – Ufuk > >> > >> > >> On Tue, Apr 5, 2016 at 10:47 PM, Zach Cox <zcox...@gmail.com> wrote: > >> > Hi - I'm trying to identify bottlenecks in my Flink streaming job, and > >> > am > >> > curious about the Back Pressure view in the job manager web UI. If > there > >> > are > >> > already docs for Back Pressure please feel free to just point me to > >> > those. > >> > :) > >> > > >> > When "Sampling in progress..." is displayed, what exactly is > happening? > >> > > >> > What do the values in the Ratio column for each Subtask mean exactly? > >> > > >> > What does Status such as OK, High, etc mean? Are these determined from > >> > the > >> > Ratio values? > >> > > >> > If my job graph looks like Source => A => B => Sink, with Back > Pressure > >> > OK > >> > for Source and Sink, but High for A and B, what does that suggest? > >> > > >> > Thanks, > >> > Zach > >> > >
Re: Checkpoint state stored in backend, and deleting old checkpoint state
Hi Stephan - incremental checkpointing sounds really interesting and useful, I look forward to trying it out. Thanks, Zach On Wed, Apr 6, 2016 at 4:39 AM Stephan Ewen <se...@apache.org> wrote: > Hi Zach! > > I am working on incremental checkpointing, hope to have it in the master > in the next weeks. > > The current approach is a to have a full self-contained checkpoint every > once in a while, and have incremental checkpoints most of the time. Having > a full checkpoint every now and then spares you from re-applying an endless > set of deltas on recovery. > > Related to that is also making the checkpointing asynchronous, so that > normal operations do not see any disruption any more. > > Greetings, > Stephan > > On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <zcox...@gmail.com> wrote: > >> Thanks for the details Konstantin and Ufuk! >> >> >> On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> >>> Hi Ufuk, >>> >>> I thought so, but I am not sure when and where ;) I will let you know, >>> if I come across it again. >>> >>> Cheers, >>> >>> Konstantin >>> >>> On 05.04.2016 21:10, Ufuk Celebi wrote: >>> > Hey Zach and Konstantin, >>> > >>> > Great questions and answers. We can try to make this more explicit in >>> the docs. >>> > >>> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf >>> > <konstantin.kn...@tngtech.com> wrote: >>> >> To my knowledge flink takes care of deleting old checkpoints (I think >>> it >>> >> says so in the documentation about savepoints.). In my experience >>> >> though, if a job is cancelled or crashes, the checkpoint files are >>> >> usually not cleaned up. So some housekeeping might be necessary. >>> > >>> > Regarding cleanup: currently only the latest successful checkpoint is >>> retained. >>> > >>> > On graceful shutdown, all checkpoints should be cleaned up as far as I >>> > know. Savepoints always have to be cleaned up manually. >>> > >>> > On crashes, the checkpoint state has to be cleaned up manually (if the >>> > JVM shut down hooks did not run). >>> > >>> > @Konstantin: did you have lingering state without crashes? >>> > >>> > – Ufuk >>> > >>> >>> -- >>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >> >
Re: Back Pressure details
The new back pressure docs are great, thanks Ufuk! I'm sure those will help others as well. In the Source => A => B => Sink example, if A and B show HIGH back pressure, should Source also show HIGH? In my case it's a Kafka source and Elasticsearch sink. I know currently our Kafka can provide data at a much higher rate than our Elasticsearch can ingest (I'm working on scaling up Elasticsearch), just curious why the Kafka source wouldn't also show HIGH back pressure. Thanks, Zach On Wed, Apr 6, 2016 at 5:36 AM Ufuk Celebi <u...@apache.org> wrote: > Hey Zach, > > just added some documentation, which will be available in ~ 30 mins > here: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/back_pressure_monitoring.html > > If you think that something is missing there, I would appreciate some > feedback. :-) > > Back pressure is determined by repeatedly calling getStackTrace() on > the task Threads executing the job. By default, 100 times with 50ms > delay between calls. If the task thread is stuck in an internal method > call requesting buffers from the network stack, this indicates back > pressure. > > The ratio you see tells you how many of the stack traces were stuck in > that method (e.g. 1 out of 100) and the status codes just group those > in a (hopefully) reasonable way (<= 0.10 is OK, <= 0.5 is LOW, > 0.5 > is HIGH). > > If you have a task with back pressure this means that it is producing > data faster than the network can consume, for example because the > downstream operator is slow or the network can't handle it. Your > Source => A => B => Sink example suggests that the sink is slowing > down/back pressuring B, which is in turn slowing down/back pressuring > A. > > Does this help? > > Keep in mind though that it is not a rock solid approach and there is > a chance that we miss the back pressure indicators or always sample > when we the task is requesting buffers (which is happening all the > time). It often works better at the extremes, e.g. when there is no > back pressure at all or very high back pressure. > > – Ufuk > > > On Tue, Apr 5, 2016 at 10:47 PM, Zach Cox <zcox...@gmail.com> wrote: > > Hi - I'm trying to identify bottlenecks in my Flink streaming job, and am > > curious about the Back Pressure view in the job manager web UI. If there > are > > already docs for Back Pressure please feel free to just point me to > those. > > :) > > > > When "Sampling in progress..." is displayed, what exactly is happening? > > > > What do the values in the Ratio column for each Subtask mean exactly? > > > > What does Status such as OK, High, etc mean? Are these determined from > the > > Ratio values? > > > > If my job graph looks like Source => A => B => Sink, with Back Pressure > OK > > for Source and Sink, but High for A and B, what does that suggest? > > > > Thanks, > > Zach > > >
Back Pressure details
Hi - I'm trying to identify bottlenecks in my Flink streaming job, and am curious about the Back Pressure view in the job manager web UI. If there are already docs for Back Pressure please feel free to just point me to those. :) When "Sampling in progress..." is displayed, what exactly is happening? What do the values in the Ratio column for each Subtask mean exactly? What does Status such as OK, High, etc mean? Are these determined from the Ratio values? If my job graph looks like Source => A => B => Sink, with Back Pressure OK for Source and Sink, but High for A and B, what does that suggest? Thanks, Zach
Kafka state backend?
Hi - as clarified in another thread [1] stateful operators store all of their current state in the backend on each checkpoint. Just curious if Kafka topics with log compaction have ever been considered as a possible state backend? Samza [2] uses RocksDB as a local state store, with all writes also going to a log-compacted Kafka topic for persistence. This seems like it might also be a good alternative backend in Flink for jobs with large amounts of long-lasting state. You would give up some throughput (due to Kafka producer writes) but there would be almost nothing to do on checkpoints. Just wanted to propose the idea and see if it has already been discussed, or maybe I'm missing some reasons why it would be a bad idea. Thanks, Zach [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html [2] http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza
Checkpoint state stored in backend, and deleting old checkpoint state
Hi - I have some questions regarding Flink's checkpointing, specifically related to storing state in the backends. So let's say an operator in a streaming job is building up some state. When it receives barriers from all of its input streams, does it store *all* of its state to the backend? I think that is what the docs [1] and paper [2] imply, but want to make sure. In other words, if the operator contains 100MB of state, and the backend is HDFS, does the operator copy all 100MB of state to HDFS during the checkpoint? Following on this example, say the operator is a global window and is storing some state for each unique key observed in the stream of messages (e.g. userId). Assume that over time, the number of observed unique keys grows, so the size of the state also grows (the window state is never purged). Is the entire operator state at the time of each checkpoint stored to the backend? So that over time, the size of the state stored for each checkpoint to the backend grows? Or is the state stored to the backend somehow just the state that changed in some way since the last checkpoint? Are old checkpoint states in the backend ever deleted / cleaned up? That is, if all of the state for checkpoint n in the backend is all that is needed to restore a failed job, then all state for all checkpoints m < n should not be needed any more, right? Can all of those old checkpoints be deleted from the backend? Does Flink do this? Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html [2] http://arxiv.org/abs/1506.08603
Re: Upserts with Flink-elasticsearch
Hi Madhukar - with the current Elasticsearch sink in Flink 1.0.0 [1], I don't think an upsert is possible, since IndexRequestBuilder can only return an IndexRequest. In Flink 1.1, the Elasticsearch 2.x sink [2] provides a RequestIndexer [3] that you can pass an UpdateRequest to do an upsert. Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/connectors/elasticsearch.html [2] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html [3] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.html On Mon, Mar 28, 2016 at 2:18 PM Madhukar Thotawrote: > Is it possible to do Upsert with existing flink-elasticsearch connector > today? >
Accumulators checkpointed?
Are accumulators stored in checkpoint state? If a job fails and restarts, are all accumulator values lost, or are they restored from checkpointed state? Thanks, Zach
Re: asm IllegalArgumentException with 1.0.0
Yes pretty much - we use sbt to run the job in a local environment, not Intellij, but should be the same thing. We were also seeing that exception running unit tests locally. We did not see the exception when assembling a fat jar and submitting to a remote Flink cluster. It seems like the flink-connector-elasticsearch jar should not have shaded classes in it. Maybe that jar in maven central was built incorrectly? We worked around this by just not depending on that elasticsearch connector at all, since we wrote our own connector for Elasticsearch 2.x. -Zach On Mon, Mar 14, 2016 at 2:03 PM Andrew Whitaker < andrew.whita...@braintreepayments.com> wrote: > We're having the same issue (we also have a dependency on > flink-connector-elasticsearch). It's only happening to us in IntelliJ > though. Is this the case for you as well? > > On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox <zcox...@gmail.com> wrote: > >> After some poking around I noticed >> that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm >> classes. If I remove that dependency from my project then I do not get the >> IllegalArgumentException. >> >> >> On Thu, Mar 10, 2016 at 11:51 AM Zach Cox <zcox...@gmail.com> wrote: >> >>> Here are the jars on the classpath when I try to run our Flink job in a >>> local environment (via `sbt run`): >>> >>> >>> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt >>> >>> There are many transitive dependencies pulled in from internal library >>> projects that probably need to be cleaned out. Maybe we are including >>> something that conflicts? Or maybe something important is being excluded? >>> >>> Are the asm classes included in Flink jars in some shaded form? >>> >>> Thanks, >>> Zach >>> >>> >>> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote: >>> >>>> Dependency shading changed a bit between RC4 and RC5 - maybe a >>>> different minor ASM version is now included in the "test" scope. >>>> >>>> Can you share the dependencies of the problematic project? >>>> >>>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote: >>>> >>>>> I also noticed when I try to run this application in a local >>>>> environment, I get the same IllegalArgumentException. >>>>> >>>>> When I assemble this application into a fat jar and run it on a Flink >>>>> cluster using the CLI tools, it seems to run fine. >>>>> >>>>> Maybe my local classpath is missing something that is provided on the >>>>> Flink task managers? >>>>> >>>>> -Zach >>>>> >>>>> >>>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote: >>>>> >>>>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a >>>>>> unit test: >>>>>> >>>>>>IllegalArgumentException: (null:-1) >>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>>>>> Source) >>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>>>>> Source) >>>>>> >>>>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279) >>>>>> >>>>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) >>>>>> >>>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >>>>>> >>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) >>>>>> >>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) >>>>>> >>>>>> The line that causes that exception is just adding >>>>>> a FlinkKafkaConsumer08 source. >>>>>> >>>>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it >>>>>> is not given a valid api version number, but InnerClosureFinder [2] looks >>>>>> fine to me. >>>>>> >>>>>> Any idea what might be causing this? This unit test worked fine with >>>>>> 1.0.0-rc0 jars. >>>>>> >>>>>> Thanks, >>>>>> Zach >>>>>> >>>>>> [1] >>>>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java >>>>>> [2] >>>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279 >>>>>> >>>>>> >>>>>> >>>> > > > -- > Andrew Whitaker | andrew.whita...@braintreepayments.com > -- > Note: this information is confidential. It is prohibited to share, post > online or otherwise publicize without Braintree's prior written consent. >
Re: asm IllegalArgumentException with 1.0.0
After some poking around I noticed that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm classes. If I remove that dependency from my project then I do not get the IllegalArgumentException. On Thu, Mar 10, 2016 at 11:51 AM Zach Cox <zcox...@gmail.com> wrote: > Here are the jars on the classpath when I try to run our Flink job in a > local environment (via `sbt run`): > > > https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt > > There are many transitive dependencies pulled in from internal library > projects that probably need to be cleaned out. Maybe we are including > something that conflicts? Or maybe something important is being excluded? > > Are the asm classes included in Flink jars in some shaded form? > > Thanks, > Zach > > > On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote: > >> Dependency shading changed a bit between RC4 and RC5 - maybe a different >> minor ASM version is now included in the "test" scope. >> >> Can you share the dependencies of the problematic project? >> >> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote: >> >>> I also noticed when I try to run this application in a local >>> environment, I get the same IllegalArgumentException. >>> >>> When I assemble this application into a fat jar and run it on a Flink >>> cluster using the CLI tools, it seems to run fine. >>> >>> Maybe my local classpath is missing something that is provided on the >>> Flink task managers? >>> >>> -Zach >>> >>> >>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote: >>> >>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit >>>> test: >>>> >>>>IllegalArgumentException: (null:-1) >>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>>> Source) >>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>>> Source) >>>> >>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279) >>>> >>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) >>>> >>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >>>> >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) >>>> >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) >>>> >>>> The line that causes that exception is just adding >>>> a FlinkKafkaConsumer08 source. >>>> >>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it >>>> is not given a valid api version number, but InnerClosureFinder [2] looks >>>> fine to me. >>>> >>>> Any idea what might be causing this? This unit test worked fine with >>>> 1.0.0-rc0 jars. >>>> >>>> Thanks, >>>> Zach >>>> >>>> [1] >>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java >>>> [2] >>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279 >>>> >>>> >>>> >>
Re: asm IllegalArgumentException with 1.0.0
Here are the jars on the classpath when I try to run our Flink job in a local environment (via `sbt run`): https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt There are many transitive dependencies pulled in from internal library projects that probably need to be cleaned out. Maybe we are including something that conflicts? Or maybe something important is being excluded? Are the asm classes included in Flink jars in some shaded form? Thanks, Zach On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote: > Dependency shading changed a bit between RC4 and RC5 - maybe a different > minor ASM version is now included in the "test" scope. > > Can you share the dependencies of the problematic project? > > On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote: > >> I also noticed when I try to run this application in a local environment, >> I get the same IllegalArgumentException. >> >> When I assemble this application into a fat jar and run it on a Flink >> cluster using the CLI tools, it seems to run fine. >> >> Maybe my local classpath is missing something that is provided on the >> Flink task managers? >> >> -Zach >> >> >> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote: >> >>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit >>> test: >>> >>>IllegalArgumentException: (null:-1) >>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>> Source) >>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown >>> Source) >>> >>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279) >>> >>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) >>> >>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) >>> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) >>> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) >>> >>> The line that causes that exception is just adding >>> a FlinkKafkaConsumer08 source. >>> >>> ClassVisitor [1] seems to throw that IllegalArgumentException when it is >>> not given a valid api version number, but InnerClosureFinder [2] looks fine >>> to me. >>> >>> Any idea what might be causing this? This unit test worked fine with >>> 1.0.0-rc0 jars. >>> >>> Thanks, >>> Zach >>> >>> [1] >>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java >>> [2] >>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279 >>> >>> >>> >
Re: ExecutionConfig.enableTimestamps() in 1.0.0
I see the new Event Time docs page, thanks for fixing that! I like the additional explanation of event time and watermarks. I also updated our TimestampExtractors to AssignerWithPeriodicWatermarks as described in [1]. I like the separation between periodic and punctuated watermark assigners in the new API - it's definitely more clear how each one operates. Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_timestamps_watermarks.html On Thu, Mar 10, 2016 at 3:33 AM Ufuk Celebi <u...@apache.org> wrote: > Just removed the page. Triggering a new docs build... > > On Thu, Mar 10, 2016 at 10:22 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Then Stephan should have removed the old doc when adding the new one… :-) > >> On 10 Mar 2016, at 10:20, Ufuk Celebi <u...@apache.org> wrote: > >> > >> Just talked with Stephan: the document you are referring to is stale. > >> Can you check out this one here: > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html > >> > >> – Ufuk > >> > >> > >> On Thu, Mar 10, 2016 at 10:17 AM, Ufuk Celebi <u...@apache.org> wrote: > >>> I've added this to the migration guide here: > >>> > https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x > >>> > >>> Feel free to add any other API changes that are missing there. > >>> > >>> – Ufuk > >>> > >>> > >>> On Thu, Mar 10, 2016 at 10:13 AM, Aljoscha Krettek < > aljos...@apache.org> wrote: > >>>> Hi, > >>>> you’re right, this should be changed to > “setStreamTimeCharacteristic(EventTime)” in the doc. > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>>> On 09 Mar 2016, at 23:21, Zach Cox <zcox...@gmail.com> wrote: > >>>>> > >>>>> Hi - I'm upgrading our app to 1.0.0 and noticed ExecutionConfig no > longer has an enableTimestamps() method. Do we just not need to call that > at all now? > >>>>> > >>>>> The docs still say to call it [1] - do they just need to be updated? > >>>>> > >>>>> Thanks, > >>>>> Zach > >>>>> > >>>>> [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/time.html > >>>>> > >>>> > > >
Re: asm IllegalArgumentException with 1.0.0
I also noticed when I try to run this application in a local environment, I get the same IllegalArgumentException. When I assemble this application into a fat jar and run it on a Flink cluster using the CLI tools, it seems to run fine. Maybe my local classpath is missing something that is provided on the Flink task managers? -Zach On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote: > Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit > test: > >IllegalArgumentException: (null:-1) > org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown > Source) > org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown > Source) > > org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279) > > org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) > > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) > > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) > > The line that causes that exception is just adding a FlinkKafkaConsumer08 > source. > > ClassVisitor [1] seems to throw that IllegalArgumentException when it is > not given a valid api version number, but InnerClosureFinder [2] looks fine > to me. > > Any idea what might be causing this? This unit test worked fine with > 1.0.0-rc0 jars. > > Thanks, > Zach > > [1] > http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java > [2] > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279 > > >
asm IllegalArgumentException with 1.0.0
Hi - after upgrading to 1.0.0, I'm getting this exception now in a unit test: IllegalArgumentException: (null:-1) org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown Source) org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown Source) org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279) org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95) org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115) org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568) org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498) The line that causes that exception is just adding a FlinkKafkaConsumer08 source. ClassVisitor [1] seems to throw that IllegalArgumentException when it is not given a valid api version number, but InnerClosureFinder [2] looks fine to me. Any idea what might be causing this? This unit test worked fine with 1.0.0-rc0 jars. Thanks, Zach [1] http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java [2] https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
ExecutionConfig.enableTimestamps() in 1.0.0
Hi - I'm upgrading our app to 1.0.0 and noticed ExecutionConfig no longer has an enableTimestamps() method. Do we just not need to call that at all now? The docs still say to call it [1] - do they just need to be updated? Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/time.html
Re: InetSocketAddress is not serializable
I ran into the same issue upgrading to Elasticsearch 2, here's how I solved it: https://gist.github.com/zcox/59e486be7aeeca381be0#file-elasticsearch2sink-java-L110 -Zach On Fri, Mar 4, 2016 at 7:30 AM HungChangwrote: > Hi, > > I'm building the connector for ElasticSearch2. One main issue for me now is > that > > List transports = new ArrayList(); > transports.add(new InetSocketTransportAddress(new > InetSocketAddress(TransportAddress, 9300))); > > throws > java.io.NotSerializableException: > org.elasticsearch.common.transport.InetSocketTransportAddress > > ES2 changed to this way and InetSocketAddress implements > java.io.Serializable rather than Writable (is it the problem?). > What would be the suggested way to handle this? should I implement a > serializable InetSocketAddress? > > Best, > > Sendoh > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/InetSocketAddress-is-not-serializable-tp5296.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Watermarks with repartition
Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example -Zach On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > yes, your description is spot on! > > Cheers, > Aljoscha > > On 26 Feb 2016, at 00:19, Zach Cox <zcox...@gmail.com> wrote: > > > > I think I found the information I was looking for: > > > > RecordWriter broadcasts each emitted watermark to all outgoing channels > [1]. > > > > StreamInputProcessor tracks the max watermark received on each incoming > channel separately, and computes the task's watermark as the min of all > incoming watermarks [2]. > > > > Is this an accurate summary of Flink's watermark propagation? > > > > So in my previous example, each window count task is building up a count > for each window based on incoming event's timestamp, and when all incoming > watermarks have progressed beyond the end of the window, the count is > emitted. So if one partition's watermark lags behind the other, it just > means the window output is triggered based on this lagging watermark. > > > > -Zach > > > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103 > > [2] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147 > > > > > > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote: > > Hi - how are watermarks passed along parallel tasks where there is a > repartition? For example, say I have a simple streaming job computing > hourly counts per key, something like this: > > > > val environment = StreamExecutionEnvironment.getExecutionEnvironment > > environment.setParallelism(2) > > environment.setStreamTimeCharacteristic(EventTime) > > environment.getConfig.enableTimestamps() > > environment > > .addSource(...) > > .assignAscendingTimestamps(_.timestamp) > > .keyBy("someField") > > .timeWindow(Time.hours(1)) > > .fold(0, (count, element) => count + 1) > > .addSink(...) > > environment.execute("example") > > > > Say the source has 2 parallel partitions (e.g. Kafka topic) and the > events from the source contain timestamps, but over time the 2 source tasks > diverge in event time (maybe 1 Kafka topic partition has many more events > than the other). > > > > The job graph looks like this: http://imgur.com/hxEpF6b > > > > From what I can tell, the execution graph, with parallelism=2, would > look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash > partition to be used, so that events with the same key end up at the same > window subtask, regardless of which source partition they came from. > > > > Since the watermarks are skewed between the parallel pipelines, what > happens when differing watermarks are sent to the window count operators? > Is something tracking the min incoming watermark there? Could anyone point > me to Flink code that implements this? I'd really like to learn more about > how this works. > > > > Thanks, > > Zach > > > > > >
Re: Watermarks with repartition
I think I found the information I was looking for: RecordWriter broadcasts each emitted watermark to all outgoing channels [1]. StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2]. Is this an accurate summary of Flink's watermark propagation? So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark. -Zach [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103 [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147 On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote: > Hi - how are watermarks passed along parallel tasks where there is a > repartition? For example, say I have a simple streaming job computing > hourly counts per key, something like this: > > val environment = StreamExecutionEnvironment.getExecutionEnvironment > environment.setParallelism(2) > environment.setStreamTimeCharacteristic(EventTime) > environment.getConfig.enableTimestamps() > environment > .addSource(...) > .assignAscendingTimestamps(_.timestamp) > .keyBy("someField") > .timeWindow(Time.hours(1)) > .fold(0, (count, element) => count + 1) > .addSink(...) > environment.execute("example") > > Say the source has 2 parallel partitions (e.g. Kafka topic) and the events > from the source contain timestamps, but over time the 2 source tasks > diverge in event time (maybe 1 Kafka topic partition has many more events > than the other). > > The job graph looks like this: http://imgur.com/hxEpF6b > > From what I can tell, the execution graph, with parallelism=2, would look > like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to > be used, so that events with the same key end up at the same window > subtask, regardless of which source partition they came from. > > Since the watermarks are skewed between the parallel pipelines, what > happens when differing watermarks are sent to the window count operators? > Is something tracking the min incoming watermark there? Could anyone point > me to Flink code that implements this? I'd really like to learn more about > how this works. > > Thanks, > Zach > > >
Watermarks with repartition
Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this: val environment = StreamExecutionEnvironment.getExecutionEnvironment environment.setParallelism(2) environment.setStreamTimeCharacteristic(EventTime) environment.getConfig.enableTimestamps() environment .addSource(...) .assignAscendingTimestamps(_.timestamp) .keyBy("someField") .timeWindow(Time.hours(1)) .fold(0, (count, element) => count + 1) .addSink(...) environment.execute("example") Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other). The job graph looks like this: http://imgur.com/hxEpF6b >From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from. Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works. Thanks, Zach
Re:
Hi Ufuk - here is the jira issue with the requested information: https://issues.apache.org/jira/browse/FLINK-3483 -Zach On Tue, Feb 23, 2016 at 8:59 AM Ufuk Celebi <u...@apache.org> wrote: > Hey Zach! I'm not aware of an open issue for this. > > You can go ahead and open an issue for it. It will be very helpful to > include the following: > - exact Chrome and OS X version > - the exectuion plan as JSON (via env.getExecutionPlan()) > - screenshot > > Thanks! > > – Ufuk > > > On Tue, Feb 23, 2016 at 3:46 PM, Zach Cox <zcox...@gmail.com> wrote: > > Hi - I typically use the Chrome browser on OS X, and notice that with > > 1.0.0-rc0 the job graph visualization displays the nodes in the graph, > but > > not any of the edges. Also the graph does not move around when dragging > the > > mouse. > > > > The job graph visualization seems to work perfectly in Safari and > Firefox on > > OS X, however. > > > > Is this a known issue or should I open a jira ticket? > > > > Thanks, > > Zach > > >
[no subject]
Hi - I typically use the Chrome browser on OS X, and notice that with 1.0.0-rc0 the job graph visualization displays the nodes in the graph, but not any of the edges. Also the graph does not move around when dragging the mouse. The job graph visualization seems to work perfectly in Safari and Firefox on OS X, however. Is this a known issue or should I open a jira ticket? Thanks, Zach
Re: Using numberOfTaskSlots to control parallelism
Thanks for the input Aljoscha and Ufuk! I will try out the #2 approach and report back. Thanks, Zach On Sat, Feb 20, 2016 at 7:26 AM Ufuk Celebiwrote: > On Sat, Feb 20, 2016 at 10:12 AM, Aljoscha Krettek > wrote: > > IMHO the only change for 2) is that you possibly get better machine > utilization because it will use more parallel threads. So I think it’s a > valid approach. > > > > @Ufuk, could there be problems with the number of network buffers? I > think not, because the connections are multiplexed in one channel, is this > correct? > > I would not expect it to become a problem. If it does, it's easy to > resolve by throwing a little more memory at the problem. [1] > > – Ufuk > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers >
Using numberOfTaskSlots to control parallelism
What would the differences be between these scenarios? 1) one task manager with numberOfTaskSlots=1 and one job with parallelism=1 2) one task manager with numberOfTaskSlots=10 and one job with parallelism=10 In both cases all of the job's tasks get executed within the one task manager's jvm. Are there any downsides to doing #2 instead of #1? I ask this question because of current issues related to watermarks with Kafka sources [1] [2] and changing parallelism with savepoints [3]. I'm writing a Flink job that processes events from Kafka topics that have 12 partitions. I'm wondering if I should just set the job parallelism=12 and make numberOfTaskSlots sum to 12 across however many task managers I set up. It seems like watermarks would work properly then, and I could effectively change job parallelism using the number of task managers (e.g. 1 TM with slots=12, or 2 TMs with slots=6, or 12 TMs with slots=1, etc). Am I missing any important details that would make this a bad idea? It seems like a bit of abuse of numberOfTaskSlots, but also seems like a fairly simple solution to a few current issues. Thanks, Zach [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tt4782.html [2] https://issues.apache.org/jira/browse/FLINK-3375 [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-parallelism-tt4967.html
Re: Changing parallelism
Hi Ufuk - thanks for the 2016 roadmap - glad to see changing parallelism is the first bullet :) Mesos support also sounds great, we're currently running job and task managers on Mesos statically via Marathon. Hi Stephan - thanks, that trick sounds pretty clever, I will try wrapping my head around using 2 different jobs and uids like that. -Zach On Thu, Feb 18, 2016 at 7:13 AM Stephan Ewen <se...@apache.org> wrote: > Hi Zach! > > Yes, changing parallelism is pretty high up the priority list. The good > news is that "scaling in" is the simpler part of changing the parallelism > and we are pushing to get that in soon. > > > Until then, there is only a pretty ugly trick that you can do right now to > "rescale' the state: > > 1) savepoint with high parallelism > > 2) run an intermediate job that has the state twice in two operators, > once with high parallelism, once with low. Emit the state from the first > operator, write in the second. The first operator has the operator ID of > the initial high-parallelism state. > > 3) Run the low parallelism job, and the stateful operator needs the ID > of the second (low parallelism) operator in the intermediate job. > > > Greetings, > Stephan > > > On Thu, Feb 18, 2016 at 9:24 AM, Ufuk Celebi <u...@apache.org> wrote: > >> Hey Zach! >> >> Sounds like a great use case. >> >> On Wed, Feb 17, 2016 at 3:16 PM, Zach Cox <zcox...@gmail.com> wrote: >> > However, the savepoint docs state that the job parallelism cannot be >> changed >> > over time [1]. Does this mean we need to use the same, fixed >> parallelism=n >> > during reprocessing and going forward? Are there any tricks or >> workarounds >> > we could use to still make changes to parallelism and take advantage of >> > savepoints? >> >> Yes, currently you have to keep the parallelism fixed. Dynamic scale >> in and out of programs will have very high priority after the 1.0 >> release [1]. Unfortunately, I'm not aware of any work arounds to >> overcome this at the moment. >> >> – Ufuk >> >> [1] https://flink.apache.org/news/2015/12/18/a-year-in-review.html (at >> the end of the post there is a road map for 2016) >> > >
Re: Availability for the ElasticSearch 2 streaming connector
Awesome, thanks Suneel. :D I made the changes to support our use case, which needed flatMap behavior (index 2 docs, or zero docs, per incoming element) instead of map, and we also need to make either IndexRequest or UpdateRequest depending on the element. -Zach On Thu, Feb 18, 2016 at 2:06 AM Suneel Marthi <smar...@apache.org> wrote: > Thanks Zach, I have a few minor changes too locally; I'll push a PR out > tomorrow that has ur changes too. > > On Wed, Feb 17, 2016 at 5:13 PM, Zach Cox <zcox...@gmail.com> wrote: > >> I recently did exactly what Robert described: I copied the code from this >> (closed) PR https://github.com/apache/flink/pull/1479, modified it a >> bit, and just included it in my own project that uses the Elasticsearch 2 >> java api. Seems to work well. Here are the files so you can do the same: >> >> https://gist.github.com/zcox/59e486be7aeeca381be0 >> >> -Zach >> >> >> On Wed, Feb 17, 2016 at 4:06 PM Suneel Marthi <suneel.mar...@gmail.com> >> wrote: >> >>> Hey I missed this thread, sorry about that. >>> >>> I have a basic connector working with ES 2.0 which I can push out. Its >>> not optimized yet and I don't have the time to look at it, if someone would >>> like to take it over go ahead I can send a PR. >>> >>> On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Mihail, >>>> >>>> It seems that nobody is actively working on the elasticsearch2 >>>> connector right now. The 1.0.0 release is already feature frozen, only bug >>>> fixes or (some) pending pull requests go in. >>>> >>>> What you can always do is copy the code from our current elasticsearch >>>> connector, set the dependency to the version you would like to use and >>>> adopt our code to their API changes. I think it might take not much time to >>>> get it working. >>>> (The reason why we usually need more time for stuff like this are >>>> integration tests and documentation). >>>> >>>> Please let me know if that solution doesn't work for you. >>>> >>>> Regards, >>>> Robert >>>> >>>> >>>> On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail <mihail.vi...@zalando.de >>>> > wrote: >>>> >>>>> Hi, >>>>> >>>>> in reference to this ticket >>>>> https://issues.apache.org/jira/browse/FLINK-3115 when do you think >>>>> that an ElasticSearch 2 streaming connector will become available? Will it >>>>> make it for the 1.0 release? >>>>> >>>>> That would be great, as we are planning to use that particular version >>>>> of ElasticSearch in the very near future. >>>>> >>>>> Best regards, >>>>> Mihail >>>>> >>>> >>>> >>> >
Re: Availability for the ElasticSearch 2 streaming connector
I recently did exactly what Robert described: I copied the code from this (closed) PR https://github.com/apache/flink/pull/1479, modified it a bit, and just included it in my own project that uses the Elasticsearch 2 java api. Seems to work well. Here are the files so you can do the same: https://gist.github.com/zcox/59e486be7aeeca381be0 -Zach On Wed, Feb 17, 2016 at 4:06 PM Suneel Marthiwrote: > Hey I missed this thread, sorry about that. > > I have a basic connector working with ES 2.0 which I can push out. Its > not optimized yet and I don't have the time to look at it, if someone would > like to take it over go ahead I can send a PR. > > On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger > wrote: > >> Hi Mihail, >> >> It seems that nobody is actively working on the elasticsearch2 connector >> right now. The 1.0.0 release is already feature frozen, only bug fixes or >> (some) pending pull requests go in. >> >> What you can always do is copy the code from our current elasticsearch >> connector, set the dependency to the version you would like to use and >> adopt our code to their API changes. I think it might take not much time to >> get it working. >> (The reason why we usually need more time for stuff like this are >> integration tests and documentation). >> >> Please let me know if that solution doesn't work for you. >> >> Regards, >> Robert >> >> >> On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail >> wrote: >> >>> Hi, >>> >>> in reference to this ticket >>> https://issues.apache.org/jira/browse/FLINK-3115 when do you think that >>> an ElasticSearch 2 streaming connector will become available? Will it make >>> it for the 1.0 release? >>> >>> That would be great, as we are planning to use that particular version >>> of ElasticSearch in the very near future. >>> >>> Best regards, >>> Mihail >>> >> >> >
Changing parallelism
Hi - we are building a stateful Flink streaming job that will run indefinitely. One part of the job builds up state per key in a global window that will need to exist for a very long time. We will definitely be using the savepoints to restore job state after new code deploys. We were planning to be able to increase the parallelism of the job incrementally over time, as the volume of input data grows. We also have a large amount of historical data loaded into Kafka we'd like to reprocess initially with the streaming job to backfill Elasticsearch, and then transition the job seamlessly to nearline processing. We were planning to use a large parallelism during the historical reprocessing, and then decrease it when the job has caught up to new events. However, the savepoint docs state that the job parallelism cannot be changed over time [1]. Does this mean we need to use the same, fixed parallelism=n during reprocessing and going forward? Are there any tricks or workarounds we could use to still make changes to parallelism and take advantage of savepoints? Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html#current-limitations