Re: Best Practice for Querying Flink State

2022-08-29 Thread Chen Qin
Hi Lu & Ken,

Flink is a stream processing engine (albeit stateful) that doesn't aim to
serve queries directly.
When it comes to serving systems, AFAIK,  has two campuses of user
requirements.

- the one that runs a really simple query (single indexing, like dynamo)
serving a large number of reads/updates.
- the one that runs a complex query (per column indexing, like pinot/druid)
serving a small number of reads and small updates.

Given the segmented nature of serving systems, Flink would be best to
ingest insert/update of states dim to query to serving systems that fit.

One of the ideas is to emit Flink states CDC (e.g add/remove/update) of a
Flink state to side output. Where a certain conditional update to serving
systems could be implemented to be able to handle restarts of the Flink job.

Chen


On Mon, Aug 29, 2022 at 7:15 PM Ken Krugler 
wrote:

> Hi Lu,
>
> It would be helpful to know about your query requirements, before making a
> recommendation.
>
> E.g. does it just need to be a key-value store, and thus you’re querying
> by a single key (which has to match the state partitioning key)?
>
> What about latency requirements? E.g. if you’re processing Flink state
> (option 3) then this is going to be large.
>
> As a final take-away, in my experience I’ve always wound up shoving data
> into a separate system (Pinot is my current favorite) for queries.
>
> — Ken
>
>
> On Aug 29, 2022, at 3:19 PM, Lu Niu  wrote:
>
> Hi, Flink Users
>
> We have a user case that requests running ad hoc queries to query flink
> state. There are several options:
>
> 1. Dump flink state to external data systems, like kafka, s3 etc. from
> there we can query the data. This is a very straightforward approach, but
> adds system complexity and overall cost.
> 2. Flink Queryable State. This requires additional development and also
> when the job is down, we can not query the data, which violates the need
> for debugging in the first place. Last, from some channel I happen to know
> this feature is on the deprecation list.
> 3. Flink State API. This requires additional development.
>
> I am wondering what are some best practices applied in production. For me,
> I really hope there is one product that 1. let me query the flink state
> using SQL 2. decouple with flink job
>
> Best
> Lu
>
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


RE: Recommended pattern for implementing a DLQ with Flink+Kafka

2020-07-22 Thread Chen Qin
Could you more specific on what “failed message” means here?In general side output can do something like were  def process(ele) {   try{    biz} catch {   Sideout( ele + exception context)}}  process(func).sideoutput(tag).addSink(kafkasink) Thanks,Chen   From: Eleanore JinSent: Wednesday, July 22, 2020 9:25 AMTo: Tom FennellyCc: userSubject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka +1 we have a similar use case for message schema validation. Eleanore On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly  wrote:Hi. I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see). We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-| Regards, Tom. 


Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread Chen Qin
My two cents here,

- flink job already has back pressure so rate limit can be done via setting 
parallelism to proper number in some use cases. There is an open issue of 
checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a 
use case where you have two streams do a simple interval join. Unless you were 
able to rate limit both with proper value dynamiclly, you might see timestamp 
and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how 
to slow down all sources without ever increasing time, wm gaps. It sounds 
complicated already.

with what being said, if you really want to have rate limit on your own, you 
can try following code :) It works well for us.
public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {

  protected static final Logger LOG = 
LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

  private final double topicRateLimit;
  private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
  Preconditions.checkArgument(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
  "subtask ratelimit should be greater than 0.1 QPS");
  subtaskRateLimiter = RateLimiter.create(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
  super.open(configuration);
}

@Override
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
Map partitionsWithOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

  return new KafkaFetcher(
  sourceContext,
  partitionsWithOffsets,
  watermarksPeriodic,
  watermarksPunctuated,
  runtimeContext.getProcessingTimeService(),
  runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
  runtimeContext.getUserCodeClassLoader(),
  runtimeContext.getTaskNameWithSubtasks(),
  deserializer,
  properties,
  pollTimeout,
  runtimeContext.getMetricGroup(),
  consumerMetricGroup,
  useMetrics) {
@Override
protected void emitRecord(T record,
  KafkaTopicPartitionState 
partitionState,
  long offset) throws Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,

KafkaTopicPartitionState partitionState,
   long offset, long timestamp) throws 
Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
  };

}
Thanks,

Chen
Pinterest Data


> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
> 
> I've noticed that this FLINK-11501 was implemented in 
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the 
> flink-connector-kafka. There is any reason for this, and why should be the 
> best solution to implement a rate limit functionality in the current Kafka 
> consumer?
> 
> Thanks,
> David
> 
> [1] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>  
> 
> 
> [2] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>  
> 


Re: Flink operator throttle

2020-05-17 Thread Chen Qin
Hi Ray,

In a bit abstract point of view, you can always throttle source and get
proper sink throughput control.
One approach might be just override base KafkaFetcher and use shaded
guava rate limtier.

https://github.com/apache/flink/blob/59714b9d6addb1dbf2171cab937a0e3fec52f2b1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L347

Best,

Chen


On Sat, May 16, 2020 at 11:48 PM Benchao Li  wrote:

> Hi,
>
> > If I want to use the rate limiter in other connectors, such as Kafka
> sink, ES sink, I need to do some more work on these connectors.
> Yes, you can do this by changing Kafka/ES sink, actually, this is how we
> did internally.
>
> > I'd like to know if the community has a plan to make a lower-level
> implementation for all connectors, also for table API and SQL?
> In my understanding, there is no on-going work on this. And usually we
> should leverage the back-pressure feature to do this.
> We can hear more from others whether this is a valid need.
>
> 王雷  于2020年5月17日周日 下午2:32写道:
>
>> Hi Benchao
>>
>> Thanks for your answer!
>>
>> According to your answer, I found `GuavaFlinkConnectorRateLimiter` which
>> is the implementation of the `FlinkConnectorRateLimiter`.
>>
>> If I want to use the rate limiter in other connectors, such as Kafka
>> sink, ES sink, I need to do some more work on these connectors.
>>
>> I'd like to know if the community has a plan to make a lower-level
>> implementation for all connectors, also for table API and SQL?
>>
>> Thanks
>> Ray
>>
>> Benchao Li  于2020年5月14日周四 下午5:49写道:
>>
>>> AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka
>>> source to have a rate limiter.
>>> (I assume you uses Kafka)
>>> However it only exists in Kafka 0.10 DataStream Connector, not in other
>>> versions nor table api.
>>>
>>> 王雷  于2020年5月14日周四 下午5:31写道:
>>>
 hi, All

 Does Flink support rate limitation?
 How to limit the rate when the external database connected by the sink
 operator has throughput limitation.
 Instead of passive back pressure after reaching the limit of the
 external database, we want to limit rate actively.

 Thanks
 Ray

>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Flink consuming rate increases slowly

2020-05-10 Thread Chen Qin
Hi Eyal,

It’s unclear what warmup phase does in your use cases. Usually we see Flink 
start consume at high rate and drop to a point downstream can handle.

Thanks
Chen

> On May 10, 2020, at 12:25 AM, Eyal Pe'er  wrote:
> 
> Hi all,
> Lately I've added more resources to my Flink cluster which required a restart 
> of all apps.
> From the cluster side, the only change I made, is to add more task slots.
> On the cluster I have a streaming app that consumes from Kafka and sinks to 
> files.
> I noticed that since the restart, the applications "warmup" has impacted 
> dramatically.
> Before, the change it took few minutes for the app to start and consume 
> normally (from my point of view, normally is a stable rate) - from 0 to 16K 
> events per second in 4 minutes.
> Now, after the change, it takes hours till it stabilizes on the normal 
> processing rate- from 0 to 12K events per second in 3 hours.
> The data source behavior hasn’t changed (same incoming rate, partitions, 
> servers etc.).
> I am aware to the backpressure mechanism in Flink, but it seems like it works 
> too slow here.
> Is there a way to speed or control it? 
>  
> Thanks a lot
> Eyal Peer



Re: Bootstrapping

2018-01-25 Thread Chen Qin
Hi Gregory,

I have similar issue when dealing with historical data. We choose Lambda
and figure out use case specific hand off protocol.
Unless storage side can support replay logs within a time range, Streaming
application authors still needs to carry extra work to implement batching
layer


What we learned is backfill historical log streams might be too expensive/
inefficient for streaming framework to handle since streaming framework
focus on optimizing unknown streams.

Hope it helps.

Chen

On Thu, Jan 25, 2018 at 12:49 PM, Gregory Fee  wrote:

> Hi group, I want to bootstrap some aggregates based on historic data in S3
> and then keep them updated based on a stream. To do this I was thinking of
> doing something like processing all of the historic data, doing a save
> point, then restoring my program from that save point but with a stream
> source instead. Does this seem like a reasonable approach or is there a
> better way to approach this functionality? There does not appear to be a
> straightforward way of doing it the way I was thinking so
> any advice would be appreciated.
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] 
>


Re: SideOutput doesn't receive anything if filter is applied after the process function

2018-01-16 Thread Chen Qin
Thanks Chesnay,

So I think to support multi input and multiple output model like data flow 
paper indicates, Flink needs to get credit based scheduling as well as side 
input ready and doing a new set of data stream apis that doesn’t constrained 
with backwards compatibility issues. Only then can we pass through side outputs 
to next operator and consumer can decide what to do with it.

Yes, it might be too far to reach but that seems the one of directions 
community can consider.

Chen


> On Jan 16, 2018, at 5:18 AM, Chesnay Schepler <ches...@apache.org> wrote:
> 
> I've opened https://issues.apache.org/jira/browse/FLINK-8437
> 
> Unfortunately i doubt we can fix this properly. The proposed solution will 
> not work if we ever allow arbitrary functions to use side-outputs.
> 
>> On 16.01.2018 08:59, Juho Autio wrote:
>> Could someone with knowledge of the right terms create this in JIRA, please? 
>> I guess I could also create it if needed..
>> 
>>> On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ches...@apache.org> 
>>> wrote:
>>> yes, i meant that process() returns the special operator. This would 
>>> definitely deserve a JIRA issue.
>>> 
>>> 
>>> On 15.01.2018 14:09, Juho Autio wrote:
>>>> Thanks for the explanation. Did you meant that process() would return a 
>>>> SingleOutputWithSideOutputOperator?
>>>> 
>>>> Any way, that should be enough to avoid the problem that I hit (and it 
>>>> also seems like the best & only way).
>>>> 
>>>> Maybe the name should be something more 
>>>> generic though, like ProcessedSingleOutputOperator or something, I 
>>>> wouldn't know..
>>>> 
>>>> Would this deserve an improvement ticket in JIRA?
>>>> 
>>>> On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org> 
>>>> wrote:
>>>>> It would mean that getSideOutput() would return a 
>>>>> SingleOutputWithSideOutputOperator which extends SingleOutputOperator 
>>>>> offering getSideOutput(). Other transformations would still return a 
>>>>> SingleOutputOperator.
>>>>> 
>>>>> With this the following code wouldn't compile.
>>>>> 
>>>>> stream
>>>>> .process(...)
>>>>> .filter(...)
>>>>> .getSideOutput(...) // compile error
>>>>> 
>>>>> You would have to explicitly define the code as below, which makes the 
>>>>> behavior unambiguous:
>>>>> 
>>>>> processed = stream
>>>>> .process(...)
>>>>> 
>>>>> filtered = processed
>>>>> .filter(...)
>>>>> 
>>>>> filteredSideOutput = processed
>>>>> .getSideOutput(...)
>>>>> .filter(...)
>>>>> 
>>>>> 
>>>>> On 15.01.2018 09:55, Juho Autio wrote:
>>>>>> > sideoutput might deserve a seperate class which inherit form 
>>>>>> > singleoutput. It might prevent lot of confusions
>>>>>> 
>>>>>> Thanks, but how could that be done? Do you mean that if one calls 
>>>>>> .process(), then the stream would change to another class
>>>>>>which would only allow calls like 
>>>>>> .getMainOutput() or .getSideOutput("name")? Of course compile time error 
>>>>>> would be even better than a runtime error, but I don't see yet how it 
>>>>>> could be done in practice.
>>>>>> 
>>>>>>> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote:
>>>>>>> Hi Juho,
>>>>>>> 
>>>>>>> I think sideoutput might deserve a seperate class which inherit form 
>>>>>>> singleoutput. It might prevent lot of confusions. A more generic 
>>>>>>> question is whether datastream api can be mulitple ins and mulitple 
>>>>>>> outs natively. It's more like scheduling problem when you come from 
>>>>>>> single process system to multiple process system, which one should get 
>>>>>>> resource and how much sharing same hardware resources, I guess it will 
>>>>>>> open gate to lots of edge cases -> strategies-> more edge cases :)
>>>>>>> 
>>>>>>> Chen
>>>&g

Re: SideOutput doesn't receive anything if filter is applied after the process function

2018-01-13 Thread Chen Qin
Hi Juho,

I think sideoutput might deserve a seperate class which inherit form
singleoutput. It might prevent lot of confusions. A more generic question
is whether datastream api can be mulitple ins and mulitple outs natively.
It's more like scheduling problem when you come from single process system
to multiple process system, which one should get resource and how much
sharing same hardware resources, I guess it will open gate to lots of edge
cases -> strategies-> more edge cases :)

Chen

On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio  wrote:

> Maybe I could express it in a slightly different way: if adding the
> .filter() after .process() causes the side output to be somehow totally
> "lost", then I believe the .getSideOutput() could be aware that there is
> not such side output to be listened to from upstream, and throw an
> exception. I mean, this should be possible when building the DAG, it
> shouldn't require starting the stream to detect? Thanks..
>
> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Juho,
>>
>> Now that I think of it this seems like a bug to me: why does the call to
>> getSideOutput succeed if it doesn't provide _any_ input?
>>
>>
>> With the way side outputs work, I don’t think this is possible (or would
>> make sense). An operator does not know whether or not it would ever emit
>> some element with a given tag.
>> As far as I understand it, calling `getSideOutput` essentially adds a
>> virtual node to the result stream graph that listens to the specified tag
>> from the upstream input.
>>
>> While I’m not aware whether or not your observation is expected behavior,
>> from an API perspective, I can see why it is a bit confusing for you.
>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>> looped him in cc’ed.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com)
>> wrote:
>>
>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If
>> I switch the position of .process() & .filter() (ie. filter first, then
>> process), both "a" & "b" are printed, as expected.
>>
>> I guess it's a bit hard to say what the side output should include in
>> this case: the stream before filtering or after it?
>>
>> What I would suggest is Flink to protect against this kind of a user
>> error that is hard to debug. Would it be possible that Flink throws an
>> exception if one tries to call .getSideOutput() on anything that doesn't
>> actually provide that side output? Now that I think of it this seems like a
>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>> _any_ input? I would expect it to get the side output data stream _before_
>> applying .filter().
>>
>> import org.apache.flink.api.common.functions.FilterFunction;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>> Operator;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.OutputTag;
>>
>> public class SideOutputProblem {
>>
>> public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> DataStreamSource stream = env.fromElements("a", "b");
>> OutputTag sideOutputTag = new
>> OutputTag("side-output"){};
>>
>> SingleOutputStreamOperator processed = stream
>>
>> .process(new ProcessFunction() {
>> @Override
>> public void processElement(String s, Context context,
>> Collector collector) throws Exception {
>> if ("a".equals(s)) {
>> collector.collect(s);
>> } else {
>> context.output(sideOutputTag, s);
>> }
>> }
>> })
>>
>> .filter(new FilterFunction() {
>> @Override
>> public boolean filter(String s) throws Exception {
>> return true;
>> }
>> });
>>
>> processed.getSideOutput(sideOutputTag).printToErr();
>>
>> processed.print();
>>
>> env.execute();
>> }
>>
>> }
>>
>> Cheers,
>> Juho
>>
>>
>


ayncIO & TM akka response

2017-12-09 Thread Chen Qin
Hi there,

In recent, our production fink jobs observed some weird performance issue.
When job tailing kafka source failed and try to catch up, asyncIO after
event trigger get much higher load on task thread. Since each TM allocated
two virtual CPU in docker, my assumption was akka message between JM and TM
shouldn't be impacted.

What I observed was TM get closed and keep restart with same error message
below. Any suggestion is appreciated!


> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '
> ​xxx
> /
> ​xxx
> :5841'. This might indicate that the remote task manager was lost.
> at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClientHandler.channelInactive(
> PartitionRequestClientHandler.java:115)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(
> AbstractChannelHandlerContext.java:237)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(
> AbstractChannelHandlerContext.java:223)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(
> AbstractChannelHandlerContext.java:237)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(
> AbstractChannelHandlerContext.java:223)
> at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(
> ByteToMessageDecoder.java:294)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(
> AbstractChannelHandlerContext.java:237)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(
> AbstractChannelHandlerContext.java:223)
> at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(
> DefaultChannelPipeline.java:829)
> at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(
> AbstractChannel.java:610)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:748)


​Chen​


Re: Data loss in Flink Kafka Pipeline

2017-12-07 Thread Chen Qin
Nishu

You might consider sideouput with metrics at least after window. I would
suggest having that to catch data screw or partition screw in all flink
jobs  and amend if needed.

Chen

On Thu, Dec 7, 2017 at 9:48 AM Fabian Hueske  wrote:

> Is it possible that the data is dropped due to being late, i.e., records
> with timestamps behind the current watemark?
> What kind of operations does your program consist of?
>
> Best, Fabian
>
> 2017-12-07 10:20 GMT+01:00 Sendoh :
>
>> I would recommend to also print the count of input and output of each
>> operator by using Accumulator.
>>
>> Cheers,
>>
>> Sendoh
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
> --
Chen
Software Eng, Facebook


Re: question on sideoutput from ProcessWindow function

2017-09-23 Thread Chen Qin
Be lated update.

actually @phoenixjiangnan is already working on this
​https://issues.apache.org/jira/browse/FLINK-7635



On Sat, Sep 23, 2017 at 8:26 AM, Ufuk Celebi <u...@apache.org> wrote:

> +1
>
> Created an issue here: https://issues.apache.org/jira/browse/FLINK-7677
>
>
> On Thu, Sep 14, 2017 at 11:51 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi,
> >
> > Chen is correct! I think it would be nice, though, to also add that
> > functionality for ProcessWindowFunction and I think this should be easy
> to
> > do since the interface is very similar to ProcessFunction and we could
> also
> > add that to the Context.
> >
> > Best,
> > Aljoscha
> >
> > On 9. Sep 2017, at 06:22, Chen Qin <qinnc...@gmail.com> wrote:
> >
> > Hi Prabhu,
> >
> > That is good question, the short answer is not yet, only ProcessFunction
> was
> > given flexibility of doing customized sideoutput at the moment.
> > Window Function wasn't given such flexibility partially due to sideoutput
> > initially targeting late arriving event for window use cases.
> >
> > +@Aljoscha might have better picture on this question.
> >
> > Thanks,
> > Chen
> >
> > On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V <vpra...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> Can we have a side output from a process window function ?
> >>
> >> I am currently genrating a side output from a process function. The main
> >> output of the process function is then Windowed and a
> ProcessWindowFunction
> >> is applied on the windows.
> >>
> >> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
> >> unable to find any api that enables this functionality.
> >>
> >> Thanks,
> >> Prabhu
> >
> >
> >
>


Re: ETL with changing reference data

2017-09-17 Thread Chen Qin
Hi Peter,

If I understand correctly, I think you are facing a delima of having
efficient dynamic referencing as well as salable processing.
I don't have answer to how thing would work for your specific case. Yet
this is just interesting topic to discuss.

Fabian provides insights and I would like to adds some salt on his idea. I
think you already point out in 1)

Can referencing update comes with meaningful event time-stamps and merge to
main stream event time.
If so, can you buffer those enriched data as well as main pipeline data
with windows and possible to do retraction at certain time point with
control messages.
That will generally means less efficient states management and varies if
these extra overhead is acceptable. Thanks to incremental checkpoint, this
is less problem compare to 1.2.

I would not recommend solving problem outside of flink primitives as you
will find rebuild everything flink tried to solve by yourself.
That usually ends up with building a customized streaming system by
yourself.

Thanks,
Chen

On Thu, Sep 14, 2017 at 6:36 AM, Fabian Hueske  wrote:

> Hi Peter,
>
> in principle, joining the data stream with the reference data would be the
> most natural approach to enrich data.
> However, stream joins for the Table API / SQL are currently under
> development and not available yet.
>
> You can of course try to solve the issue manually using UDFs but this will
> require many tweaks and might be fragile.
> I would setup the input as a union of two tables. The first is the regular
> data stream, the second one is used to feed in records that need to be
> reprocessed.
> The enriching UDFs could poll for updates and load them.
> In this approach, you need to somehow synchronize updating the reference
> data and feeding in the records to reprocess to ensure that all functions
> have the latest version of the data when reprocessing.
>
> Best, Fabian
>
>
>
> 2017-09-11 23:01 GMT+02:00 Peter Lappo :
>
>> Thanks Chen
>> We add our reference data to a JVM global hash map, one map per reference
>> data type (we found flink table joins were too slow as they were doing a
>> table scan) so a side pipeline to update the reference data is a nice idea
>> but may suffer from concurrency issues. If there are pending records to
>> process these may get processed before the reference data update especially
>> if fetching reference data is slow, as is in our case.
>>
>> Having said that processing reference data sequentially in the main
>> pipeline doesn’t help either if there is more than one parallel data stream.
>>
>>
>> On 11 Sep 2017, at 02:29, qinnc...@gmail.com wrote:
>>
>> Hi Peter,
>>
>> I think what you referred is typical amendment process where partial or
>> all results need to modified. I think it is definitely interesting topic!
>> Here is my two cents
>>
>> In ideal world, reference data source can ingest updated used values as
>> events and join with buffered events in windows . (it’s a bit counter
>> intuitive, but think there is a magic function where we ingest all
>> reference data as stream instead of doing on demand rpc)
>>
>> Unfortunately, in lots of use cases, it seems hard to know exactly how
>> reference data source used and dump reference data costs too much. So
>> replay pipeline might be cheapest way to get things done in general.
>>
>> In some cases,  results are partitioned and bounded. It makes possible to
>> recomputed within bounded windows, that may requires a bit work to
>> customize window which hold longer than watermark pass its endtime. I
>> remember there was a Jira talk about retraction.
>> In other cases, results are derived from long history which makes not
>> rationale to keep. A side pipeline capture those events with late arriving
>> event handling might interact with external storage and amend results.
>>
>> Thanks,
>> Chen
>>
>>
>> *From: *Peter Lappo 
>> *Sent: *Sunday, September 10, 2017 3:00 PM
>> *To: *user@flink.apache.org
>> *Subject: *ETL with changing reference data
>>
>> hi,
>> We are building an ETL style application in Flink that consumes records
>> from a file or a message bus as a DataStream. We are transforming records
>> using SQL and UDFs. The UDF loads reference data in the open method and
>> currently the data loaded remains in memory until the job is cancelled. The
>> eval method of the UDF is used to do the actual transformation on a
>> particular field.
>> So of course reference data changes and data will need to reprocessed.
>> Lets assume we can identify and resubmit records for reprocessing what is
>> the best design that
>> * keeps the Flink job running
>> * reloads the changed reference data
>> so that records are reprocessed in a deterministic fashion
>>
>> Two options spring to mind
>> 1) send a control record to the stream that reloads reference data or
>> part of it and ensure resubmitted records are processed after the reload
>> message
>> 2) use a 

Re: question on sideoutput from ProcessWindow function

2017-09-08 Thread Chen Qin
Hi Prabhu,

That is good question, the short answer is not yet, only ProcessFunction
was given flexibility of doing customized sideoutput at the moment.
Window Function wasn't given such flexibility partially due to sideoutput
initially targeting late arriving event for window use cases.

+@Aljoscha might have better picture on this question.

Thanks,
Chen

On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V  wrote:

> Hi,
>
> Can we have a side output from a process window function ?
>
> I am currently genrating a side output from a process function. The main
> output of the process function is then Windowed and a ProcessWindowFunction
> is applied on the windows.
>
> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
> unable to find any api that enables this functionality.
>
> Thanks,
> Prabhu
>


Re: GRPC source in Flink

2017-07-31 Thread Chen Qin
Hi Basanth,

Given the fact that Flink put failure recovery garantee on checkpointing
and source rewinding.
I can imagine a lossless rpc source would be tricky. In essence, any rpc
source needs to provide rewind api which can buffer at least to last
success checkpoint.
In production use cases, put your data received from rpc server to a
distributed queue and consume from queue might be better idea.

Thanks,
Chen


On Mon, Jul 31, 2017 at 5:13 PM, Basanth Gowda 
wrote:

> Hi,
>
>
> Is there a way to get data from GRPC source in flink.  If we can how we
> guarantee that events are not lost once submitted to Flink.
>
> thank you
>


Re: Checkpoints very slow with high backpressure

2017-05-31 Thread Chen Qin
What is root cause of back pressure?
The reason why I ask is we investigated and applied metrics to measure time
to process event and ends up finding bottle neck at frequent managed state
updates. Our approach was keeping mem cache and periodical updates states
before checkpointing cycle kick in.

This thread might somehow related.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/large-sliding-window-perf-question-td13277.html#none

Chen

On Wed, May 31, 2017 at 7:19 PM, SHI Xiaogang 
wrote:

> Hi rhashmi
>
> We are also experiencing slow checkpoints when there exist back pressure.
> It seems there is no good method to handle back pressure now.
>
> We work around it by setting a larger number of checkpoint timeout. The
> default value is 10min. But checkpoints usually take more time to complete
> when there exists back pressure.  You can set it via `CheckpointConfig#
> setCheckpointTimeout()`.
>
> Regards,
> Xiaogang
>
>
>
> 2017-06-01 5:36 GMT+08:00 rhashmi :
>
>> So what is the resolution? flink consuming messages from kafka. Flink went
>> down about a day ago, so now flink has to process 24 hour worth of events.
>> But i hit backpressure, as of right now checkpoint are timing out. Is
>> there
>> any recommendation how to handle this situation?
>>
>> Seems like trigger are also not firing so no update being made to down
>> line
>> database.
>>
>> is there recommended approach to handle backpressure?
>>
>> Version Flink 1.2.
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Checkpoints-very-
>> slow-with-high-backpressure-tp12762p13411.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: large sliding window perf question

2017-05-29 Thread Chen Qin
B.T.W It might be better off to pre aggregation via slidingWindow with 
controlled bucket size and batch update as well as retention.

Thanks,
Chen

> On May 29, 2017, at 3:05 PM, Chen Qin <qinnc...@gmail.com> wrote:
> 
> I see, not sure this this hack works. It utilize operator state to hold all 
> <key, states> mapping assigned to that operator instance.
> 
> If key by can generate determined mapping between upstream events to fixed 
> operator parallelism, then the operator state could hold mapping between keys 
>  and their states, updates only needed when snapshot triggered.(dump cache to 
> operator state) I don’t use timer in this case, but keep a last emit map 
> (keyed by event key) to track when to flush downstream within processFunction.
> 
> 
> Thanks,
> Chen
> 
> 
>> On May 29, 2017, at 2:38 AM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> Hi Chen,
>> 
>> How to you update the ValueState during checkpointing. I’m asking because a 
>> keyed state should always be scoped to a key and when checkpointing there is 
>> no key scope because we are not processing any incoming element and we’re 
>> not firing a timer (the two cases where we have a key scope).
>> 
>> Best,
>> Aljoscha
>> 
>>> On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com 
>>> <mailto:qinnc...@gmail.com>> wrote:
>>> 
>>> Got it! Looks like 30days window and trigger 10seconds is way too many 
>>> (quarter million every 10 seconds per key, around 150 keys). 
>>> 
>>> Just to add some background, I tried three ways to implement this large 
>>> sliding window pipeline, all share same configuration and use rocksdb 
>>> statebackend remote to s3
>>> out of box sliding window 30days 10s trigger
>>> processfunction with list state
>>> process function with in memory cache, update valuestate during checkpoint, 
>>> filter & emits list of events periodically. Value state checkpoint as blob 
>>> seems complete quickly.
>>> First two options see perf issue, third one so far works fine.
>>> 
>>> Thanks,
>>> Chen
>>> 
>>> On Wed, May 24, 2017 at 8:24 AM, Stefan Richter 
>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>>> Yes Cast, I noticed your version is already 1.2.1, which is why I contacted 
>>> Aljoscha to take a look here because he knows best about the expected 
>>> scalability of the sliding window implementation.
>>>  
>>>> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com 
>>>> <mailto:ctank...@bol.com>>:
>>>> 
>>>> Hi,
>>>>  
>>>> Thanks Aljoshcha!
>>>> To complete my understanding: the problem here is that each element in the 
>>>> sliding window(s) basically triggers 240 get+put calls instead of just 1, 
>>>> right? I can see how that blows up :-) 
>>>> I have a good idea on how to proceed next, so I will be trying out writing 
>>>> the custom ProcessFunction next (week).
>>>>  
>>>> Stefan, in our case we are already on Flink 1.2.1 which should have the 
>>>> patched version of RocksDB, right? Because that patch did solve an issue 
>>>> we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink 
>>>> which was stalling quite often under Flink 1.2.0) but did not solve this 
>>>> case, which fits the “way too much RocksDB access” explanation better.
>>>>  
>>>>  
>>>> Thanks again,
>>>> Carst
>>>>  
>>>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
>>>> Date: Wednesday, May 24, 2017 at 16:13
>>>> To: Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>>
>>>> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, 
>>>> "user@flink.apache.org <mailto:user@flink.apache.org>" 
>>>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>>>> Subject: Re: large sliding window perf question
>>>>  
>>>> Hi, 
>>>>  
>>>> I’m afraid you’re running into a general shortcoming of the current 
>>>> sliding windows implementation: every sliding window is treated as its own 
>>>> window that has window contents and trigger state/timers. For example, if 
>>>> you have a 

Re: large sliding window perf question

2017-05-29 Thread Chen Qin
I see, not sure this this hack works. It utilize operator state to hold all 
<key, states> mapping assigned to that operator instance.

If key by can generate determined mapping between upstream events to fixed 
operator parallelism, then the operator state could hold mapping between keys  
and their states, updates only needed when snapshot triggered.(dump cache to 
operator state) I don’t use timer in this case, but keep a last emit map (keyed 
by event key) to track when to flush downstream within processFunction.


Thanks,
Chen


> On May 29, 2017, at 2:38 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Chen,
> 
> How to you update the ValueState during checkpointing. I’m asking because a 
> keyed state should always be scoped to a key and when checkpointing there is 
> no key scope because we are not processing any incoming element and we’re not 
> firing a timer (the two cases where we have a key scope).
> 
> Best,
> Aljoscha
> 
>> On 24. May 2017, at 21:05, Chen Qin <qinnc...@gmail.com 
>> <mailto:qinnc...@gmail.com>> wrote:
>> 
>> Got it! Looks like 30days window and trigger 10seconds is way too many 
>> (quarter million every 10 seconds per key, around 150 keys). 
>> 
>> Just to add some background, I tried three ways to implement this large 
>> sliding window pipeline, all share same configuration and use rocksdb 
>> statebackend remote to s3
>> out of box sliding window 30days 10s trigger
>> processfunction with list state
>> process function with in memory cache, update valuestate during checkpoint, 
>> filter & emits list of events periodically. Value state checkpoint as blob 
>> seems complete quickly.
>> First two options see perf issue, third one so far works fine.
>> 
>> Thanks,
>> Chen
>> 
>> On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>> wrote:
>> Yes Cast, I noticed your version is already 1.2.1, which is why I contacted 
>> Aljoscha to take a look here because he knows best about the expected 
>> scalability of the sliding window implementation.
>>  
>>> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctank...@bol.com 
>>> <mailto:ctank...@bol.com>>:
>>> 
>>> Hi,
>>>  
>>> Thanks Aljoshcha!
>>> To complete my understanding: the problem here is that each element in the 
>>> sliding window(s) basically triggers 240 get+put calls instead of just 1, 
>>> right? I can see how that blows up :-) 
>>> I have a good idea on how to proceed next, so I will be trying out writing 
>>> the custom ProcessFunction next (week).
>>>  
>>> Stefan, in our case we are already on Flink 1.2.1 which should have the 
>>> patched version of RocksDB, right? Because that patch did solve an issue we 
>>> had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which 
>>> was stalling quite often under Flink 1.2.0) but did not solve this case, 
>>> which fits the “way too much RocksDB access” explanation better.
>>>  
>>>  
>>> Thanks again,
>>> Carst
>>>  
>>> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
>>> Date: Wednesday, May 24, 2017 at 16:13
>>> To: Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com>>
>>> Cc: Carst Tankink <ctank...@bol.com <mailto:ctank...@bol.com>>, 
>>> "user@flink.apache.org <mailto:user@flink.apache.org>" 
>>> <user@flink.apache.org <mailto:user@flink.apache.org>>
>>> Subject: Re: large sliding window perf question
>>>  
>>> Hi, 
>>>  
>>> I’m afraid you’re running into a general shortcoming of the current sliding 
>>> windows implementation: every sliding window is treated as its own window 
>>> that has window contents and trigger state/timers. For example, if you have 
>>> a sliding window of size 4 hours with 1 minute slide this means each 
>>> element is in 240 windows and you basically amplify writing to RocksDB by 
>>> 240. This gets out of hand very quickly with larger differences between 
>>> window side and slide interval.
>>>  
>>> I’m also afraid there is no solution for this right now so the workaround 
>>> Chen mentioned is the way to go right now.
>>>  
>>> Best,
>>> Aljoscha
>>> On 24. May 2017, at 14:07, Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com&

Re: large sliding window perf question

2017-05-24 Thread Chen Qin
more
> than 2seconds even for the larger states.
>
>
> At this point, I’m a bit at a loss to figure out what’s going on. My best
> guess is it has to do with the state access to the RocksDBFoldingState, but
> why this so slow is beyond me.
>
> Hope this info helps in figuring out what is going on, and hopefully it is
> actually related to Chen’s case :)
>
>
> Thanks,
> Carst
>
> *From: *Stefan Richter <s.rich...@data-artisans.com>
> *Date: *Tuesday, May 23, 2017 at 21:35
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: large sliding window perf question
>
> Hi,
>
> Which state backend and Flink version are you using? There was a problem
> with large merging states on RocksDB, caused by some inefficiencies in the
> merge operator of RocksDB. We provide a custom patch for this with all
> newer versions of Flink.
>
> Best,
> Stefan
>
>
> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnc...@gmail.com>:
>
> Hi there,
>
> I have seen some weird perf issue while running event time based job with
> large sliding window (24 hours offset every 10s)
>
> pipeline looks simple,
> tail kafka topic and assign timestamp and watermark, forward to large
> sliding window (30days) and fire every 10 seconds and print out.
>
> what I have seen first hand was checkpointing stuck, took longer than
> timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems
> back pressure kick in and window operator consumes message really slowly
> and throttle sources.
>
> I also tried to limit window time to mins and all issues are gone.
>
> Any suggestion on this. My work around is I implemented processFunction
> and keep big value state, periodically evaluate and emit downstream
> (emulate what sliding window does)
>
> Thanks,
> Chen
>
>
>
>
>
>
>
>
>
>
>
>


large sliding window perf question

2017-05-23 Thread Chen Qin
Hi there,

I have seen some weird perf issue while running event time based job with
large sliding window (24 hours offset every 10s)

pipeline looks simple,
tail kafka topic and assign timestamp and watermark, forward to large
sliding window (30days) and fire every 10 seconds and print out.

what I have seen first hand was checkpointing stuck, took longer than
timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems
back pressure kick in and window operator consumes message really slowly
and throttle sources.

I also tried to limit window time to mins and all issues are gone.

Any suggestion on this. My work around is I implemented processFunction and
keep big value state, periodically evaluate and emit downstream (emulate
what sliding window does)

Thanks,
Chen


Re: Incremental checkpoint branch

2017-03-03 Thread Chen Qin
Hi Vishnu,

My best gussing is there are lots of customized "incremental checkpointing"
done via patch around rocksdb statebackend and rocksdb checkpoints.

http://rocksdb.org/blog/2015/11/10/use-checkpoints-for-efficient-snapshots.html

Thanks,
Chen

On Fri, Mar 3, 2017 at 1:16 PM, Ted Yu  wrote:

> I checked https://github.com/apache/flink and FLINK-5053
>
> However, I haven't found such information.
>
> On Fri, Mar 3, 2017 at 1:14 PM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Hi,
>>
>> Can someone point me to the branch where the ongoing work for incremental
>> checkpoint is going on, I would like to try it out even if the work is not
>> complete.
>>
>> I have a use case where the state size increase about ~1gb every 5
>> minutes.
>>
>> Thanks,
>> Vishnu
>>
>
>


Re: Side outputs

2017-02-08 Thread Chen Qin
Hi Billy,

Without looking into detail how batch api works. I thought filter approach
might not the most efficient in general to implement toplogy conditional
branching. Again, It may not answer your question in term of prof
improvement.

If you are willing to use streaming api, you might consider FLINK-4460

Thanks,
Chen

On Wed, Feb 8, 2017 at 9:04 AM, Newport, Billy  wrote:

> I’ve implemented side outputs right now using an enum approach as
> recommended be others. Basically I have a mapper which wants to generate 4
> outputs (DATA, INSERT, UPDATES, DELETE).
>
>
>
> It emits a Tuple2 right now and I use a 4 following filters
> to write each ‘stream’ to a different parquet file.
>
>
>
> It’s basically a cogroup followed by 4 filters followed by 4 parquet sinks.
>
>
>
> The performance seems very bad. If we remove the filters and simply write
> the output of cogroup to parquet then it’s runs in half the current time.
> So the filter+parquet is as expensive as cogroup which is what has me
> puzzled.
>
>
>
> The flow is basically
>
>
>
> DataSet staging = avro File A;
>
> DataSet previousData1 = avro File A; (same file)
>
> DataSet live = previousData1.filter(liveFilter)
>
> DataSet previousData2;
>
> DataSet dead = previousData2.filter(deadFilter)
>
> DataSet mergedCombine = live.coGroup(staging)
>
> DataSet data = mergedCombine.filter(DATA)
>
> DataSet dataPlusDead = data.union(dead)
>
> dataPlusDead.write to parquet
>
> DataSet inserts = mergedCombine.filter(INSERT)
>
> Inserts.write to parquet
>
> DataSet updates = mergedCombine.filter(UPDATE)
>
> Updates.write to parquet
>
> DataSet deletes = mergedCombine.filter(DELETE)
>
> Deletes.write to parquet.
>
>
>
> First, reading  live and dead is taking a very long time relative to it’s
> expected cost. Second, the cogroup seems to be slow when combine with the 4
> filter/saves. Removing the filter/saves reduces cogroup time by half
> (including single write with no filters).
>
>
>
> Any ideas on optimizing this?
>
>
>
>
>
> *Billy Newport*
>
> Data Architecture, Goldman Sachs & Co.
> 30 Hudson | 37th Floor | Jersey City, NJ
>
> Tel:  +1 (212) 8557773 <(212)%20855-7773> |  Cell:  +1 (507) 254-0134
> <(507)%20254-0134>
> Email: billy.newp...@gs.com , KD2DKQ
>
>
>


complete digraph

2017-02-07 Thread Chen Qin
Hi there,

I don't think this would be a urgent topic but definitely seems interesting
topic to me.

Does flink topology able to run complete digraph
(excluding sources and
sinks)?  The use case is more around support event based arbitary state
transitions within a flink job.

Thank,
Chen


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-26 Thread Chen Qin
We worked around S3 and had a beer with our Hadoop engineers...



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-snapshotting-to-S3-Timeout-waiting-for-connection-from-pool-tp10994p11330.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: multi tenant workflow execution

2017-01-24 Thread Chen Qin
Hi Fabian,

AsyncFunction and ProcessFunction do help!

I assume per event timers I created in implement RichProcessFunction will
be part of key grouped states & cached in memory during runtime right? I am
interested in this because we are targeting large deployment of million TPS
event source. I would like to understand checkpoint size and speed
implications.

How about checkpointing iteration stream? Can we achieve at least once
semantic in 1.2 on integration jobs?

Thanks,
Chen

On Tue, Jan 24, 2017 at 2:26 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Chen,
>
> if you plan to implement your application on top of the upcoming Flink
> 1.2.0 release, you might find the new AsyncFunction [1] and the
> ProcessFunction [2] helpful.
> AsyncFunction can be used for non-blocking calls to external services and
> maintains the checkpointing semantics.
> ProcessFunction allows to register and react to timers. This might easier
> to use than a window for the 24h timeout.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/asyncio.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/process_function.html
>
> 2017-01-24 0:41 GMT+01:00 Chen Qin <qinnc...@gmail.com>:
>
>> Hi there,
>>
>> I am researching running one flink job to support customized event driven
>> workflow executions. The use case is to support running various workflows
>> that listen to a set of kafka topics and performing various rpc checks, a
>> user travel through multiple stages in a rule execution(workflow
>> execution). e.g
>>
>> kafka topic : user click stream
>> rpc checks:
>>
>> if user is member,
>> if user has shown interest of signup
>>
>>
>> ​workflows:
>> ​
>>
>> workflow 1: user click -> if user is member do A then do B
>> workflow 2: user click -> if user has shown interest of signup then do A
>> otherwise wait for 60 mins and try recheck, expire in 24 hours
>>
>> The goal is as I said to run workflow1 & workflow2 in one flink job.
>>
>> Initial thinking describes below
>>
>> sources are series of kafka topics, all events go through coMap,cache
>> lookup event -> rules mapping and fan out to multiple {rule, user} tuple.
>> Based on rule definition and stage user is in a given rule, it do series of
>> async rpc check and side outputs to various of sinks.
>>
>>- If a {rule, user} tuple needs to stay in a operator states longer
>>(1 day), there should be a window following async rpc checks with
>>customized purgetrigger firing those passes and purge either pass check or
>>expired tuples.
>>- If a {rule, user} execute to a stage which waits for a kafka event,
>>it should be added to cache and hookup with coMap lookups near sources
>>
>>
>>  Does that makes sense?
>>
>> Thanks,
>> Chen
>>
>>
>>
>


multi tenant workflow execution

2017-01-23 Thread Chen Qin
Hi there,

I am researching running one flink job to support customized event driven
workflow executions. The use case is to support running various workflows
that listen to a set of kafka topics and performing various rpc checks, a
user travel through multiple stages in a rule execution(workflow
execution). e.g

kafka topic : user click stream
rpc checks:

if user is member,
if user has shown interest of signup


​workflows:
​

workflow 1: user click -> if user is member do A then do B
workflow 2: user click -> if user has shown interest of signup then do A
otherwise wait for 60 mins and try recheck, expire in 24 hours

The goal is as I said to run workflow1 & workflow2 in one flink job.

Initial thinking describes below

sources are series of kafka topics, all events go through coMap,cache
lookup event -> rules mapping and fan out to multiple {rule, user} tuple.
Based on rule definition and stage user is in a given rule, it do series of
async rpc check and side outputs to various of sinks.

   - If a {rule, user} tuple needs to stay in a operator states longer (1
   day), there should be a window following async rpc checks with customized
   purgetrigger firing those passes and purge either pass check or expired
   tuples.
   - If a {rule, user} execute to a stage which waits for a kafka event, it
   should be added to cache and hookup with coMap lookups near sources


 Does that makes sense?

Thanks,
Chen


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Chen Qin
We have seen this issue back to Flink 1.0. Our finding back then was traffic 
congestion to AWS in internal network. Many teams too dependent on S3 and 
bandwidth is shared, cause traffic congestion from time to time.

Hope it helps!

Thanks
Chen

> On Jan 12, 2017, at 03:30, Ufuk Celebi  wrote:
> 
> Hey Shannon!
> 
> Is this always reproducible and how long does it take to reproduce it?
> 
> I've not seen this error before but as you say it indicates that some
> streams are not closed.
> 
> Did the jobs do any restarts before this happened? Flink 1.1.4
> contains fixes for more robust releasing of resources in failure
> scenarios. Is trying 1.1.4 an option?
> 
> – Ufuk
> 
>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey  wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>> 
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>> 
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>> at 

Re: HashMap/HashSet Serialization Issue

2017-01-06 Thread Chen Qin
My understanding is HashMap doesn't work with Flink Native serialization
framework, though I might be wrong.

This might worth reading
​
https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization

-Chen​

On Fri, Jan 6, 2017 at 6:06 PM, Charith Wickramarachchi <
charith.dhanus...@gmail.com> wrote:

> Hi All,
>
> I am using flink-gelly and using a custom POJO type as the
> VertexValue/MessageType (I am using the vertex-centric model). The POJO
> contains HashMap/HashSet as members. While executing the job I am getting
> following Log message.
>
> 17:50:53,582 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
> - No fields detected for class java.util.HashSet. Cannot be used as
> a PojoType. Will be handled as GenericType
> 17:50:53,583 INFO  org.apache.flink.api.java.typeutils.
> ​​
> TypeExtractor - class java.util.HashMap is not a valid POJO
> type
>
> Is there a way to resolve this issue?
>
> Thanks,
> Charith
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Blog  http://charith.wickramaarachchi.org/
> 
> Twitter  @charithwiki 
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the
> intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should
> not print, copy, retransmit, disseminate, or otherwise use the
> information contained in this communication. Internet communications
> cannot be guaranteed to be timely, secure, error or virus-free. The
> sender does not accept liability for any errors or omissions
>


Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-06 Thread Chen Qin
Just noticed there are only two partitions per topic. Regardless of how large 
parallelism set. Only two of those will get partition assigned at most.

Sent from my iPhone

> On Jan 6, 2017, at 02:40, Chakravarthy varaga  
> wrote:
> 
> Hi All,
> 
> Any updates on this?
> 
> Best Regards
> CVP
> 
>> On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga 
>>  wrote:
>> 
>> Hi All,
>> 
>> I have a job as attached.
>> 
>> I have a 16 Core blade running RHEL 7. The taskmanager default number of 
>> slots is set to 1. The source is a kafka stream and each of the 2 
>> sources(topic) have 2 partitions each.
>> What I notice is that when I deploy a job to run with #parallelism=2 the 
>> total processing time doubles the time it took when the same job was 
>> deployed with #parallelism=1. It linearly increases with the parallelism.
>> 
>> Since the numberof slots is set to 1 per TM, I would assume that the job 
>> would be processed in parallel in 2 different TMs and that each consumer in 
>> each TM is connected to 1 partition of the topic. This therefore should have 
>> kept the overall processing time the same or less !!!
>> 
>> The co-flatmap connects the 2 streams & uses ValueState (checkpointed in 
>> FS). I think this is distributed among the TMs. My understanding is that the 
>> search of values state could be costly between TMs.  Do you sense something 
>> wrong here?
>> 
>> Best Regards
>> CVP
>> 
>> 
>> 
>> 
> 


Re: Apache siddhi into Flink

2016-08-27 Thread Chen Qin
​+1​


On Aug 26, 2016, at 11:23 PM, Aparup Banerjee (apbanerj) 
wrote:

Hi-

Has anyone looked into embedding apache siddhi into Flink.

Thanks,
Aparup


Re: State in external db (dynamodb)

2016-07-25 Thread Chen Qin
> I wonder if it can be solved by storing state in the external store with a
> tuple: (state, previous_state, checkpoint_id). Then when reading from the
> store, if checkpoint_id is in the future, read the previous_state,
> otherwise take the current_state.
>

I think it did "filtering" part work. Pointing to previous checkpoint
states that is complete.
It do need to hold two states per snapshot(or base+delta). It would be nice
if Flink can support incremental checkpointing and reason about who ref who
during clean up so that each records are not growing larger and larger. But
it seems off topic from your question.



> On Mon, Jul 25, 2016 at 3:20 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi Chen,
>>
>> Can you explain what you mean a bit more? I'm not sure I understand the
>> problem.
>>
>> Does anyone know if the tooling discussed here has been merged into Flink
>> already? Or if there's an example of what this custom sink would look like?
>> I guess the sink would buffer updates in-memory between checkpoints. Then
>> it would implement the Checkpointed interface and write to the external
>> store in snapshotState(...)?
>>
>> Thanks,
>> Josh
>>
>> On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qinnc...@gmail.com> wrote:
>>
>>>
>>>
>>> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>> >(1)  Only write to the DB upon a checkpoint, at which point it is known
>>> that no replay of that data will occur any more. Values from partially
>>> successful writes will be overwritten >with correct value. I assume that is
>>> what you thought of when referring to the State Backend, because in some 
>>> sense,
>>> that is what that state backend would do.
>>>
>>>
>>> I feel the problem is about how to commit all snapshots as a
>>> transaction. Partial writes pose cleanup challenges when job restore.
>>> A easy hack would be treat Rocksdb as cache and keep states updates
>>> there. Aka aemanifest. do cleanup check before actual restore.
>>>
>>>
>>>
>>> >I think it is simpler to realize that in a custom sink, than developing
>>>  a new state backend.  Another Flink committer (Chesnay) has developed
>>> some nice tooling for that, to >be merged into Flink soon.
>>>
>>> I am planning to implement something like this:
>>>
>>> Say I have a topology which looks like this: [source => operator =>
>>> sink], I would like it to work like this:
>>> 1. Upon receiving an element, the operator retrieves some state from an
>>> external key-value store (would like to put an in-memory cache on top of
>>> this with a TTL)
>>> 2. The operator emits a new state (and updates its in-memory cache with
>>> the new state)
>>> 3. The sink batches up all the new states and upon checkpoint flushes
>>> them to the external store
>>>
>>> Could anyone point me at the work that's already been done on this? Has
>>> it already been merged into Flink?
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> regarding windows and incremental aggregation. This is already
>>>> happening in Flink as of now. When you give a ReduceFunction on a window,
>>>> which "sum" internally does, the result for a window is incrementally
>>>> updated whenever a new element comes in. This incremental aggregation only
>>>> happens when you specify a ReduceFunction or a FoldFunction, not for the
>>>> general case of a WindowFunction, where all elements in the window are
>>>> required.
>>>>
>>>> You are right about incremental snapshots. We mainly want to introduce
>>>> them to reduce latency incurred by snapshotting. Right now, processing
>>>> stalls when a checkpoint happens.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>>>>
>>>>> Thanks very kindly for your response, Stephan!
>>>>>
>>>>> We will definitely use a custom sink for persistence of idempotent
>>>>> mutations whenever possible. Exposing state as read-only to external
>>>>> systems is a complication we will try to avoid. Also, we will definitely
>>>>>

Re: State in external db (dynamodb)

2016-07-24 Thread Chen Qin


> On Jul 22, 2016, at 2:54 AM, Josh  wrote:
> 
> Hi all,
> 
> >(1)  Only write to the DB upon a checkpoint, at which point it is known that 
> >no replay of that data will occur any more. Values from partially successful 
> >writes will be overwritten >with correct value. I assume that is what you 
> >thought of when referring to the State Backend, because in some sense, that 
> >is what that state backend would do.

I feel the problem is about how to commit all snapshots as a transaction. 
Partial writes pose cleanup challenges when job restore. 
A easy hack would be treat Rocksdb as cache and keep states updates there. Aka 
aemanifest. do cleanup check before actual restore.


> 
> >I think it is simpler to realize that in a custom sink, than developing a 
> >new state backend.  Another Flink committer (Chesnay) has developed some 
> >nice tooling for that, to >be merged into Flink soon. 
> 
> I am planning to implement something like this:
> 
> Say I have a topology which looks like this: [source => operator => sink], I 
> would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an 
> external key-value store (would like to put an in-memory cache on top of this 
> with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with the 
> new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them to 
> the external store
> 
> Could anyone point me at the work that's already been done on this? Has it 
> already been merged into Flink?
> 
> Thanks,
> Josh
> 
>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek  
>> wrote:
>> Hi,
>> regarding windows and incremental aggregation. This is already happening in 
>> Flink as of now. When you give a ReduceFunction on a window, which "sum" 
>> internally does, the result for a window is incrementally updated whenever a 
>> new element comes in. This incremental aggregation only happens when you 
>> specify a ReduceFunction or a FoldFunction, not for the general case of a 
>> WindowFunction, where all elements in the window are required.
>> 
>> You are right about incremental snapshots. We mainly want to introduce them 
>> to reduce latency incurred by snapshotting. Right now, processing stalls 
>> when a checkpoint happens.
>> 
>> Cheers,
>> Aljoscha
>> 
>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey  wrote:
>>> Thanks very kindly for your response, Stephan!
>>> 
>>> We will definitely use a custom sink for persistence of idempotent 
>>> mutations whenever possible. Exposing state as read-only to external 
>>> systems is a complication we will try to avoid. Also, we will definitely 
>>> only write to the DB upon checkpoint, and the write will be synchronous and 
>>> transactional (no possibility of partial success/failure).
>>> 
>>> However, we do want Flink state to be durable, we want it to be in memory 
>>> when possible, and we want to avoid running out of memory due to the size 
>>> of the state. For example, if you have a wide window that hasn't gotten an 
>>> event for a long time, we want to evict that window state from memory. 
>>> We're now thinking of using Redis (via AWS Elasticache) which also 
>>> conveniently has TTL, instead of DynamoDB.
>>> 
>>> I just wanted to check whether eviction of (inactive/quiet) state from 
>>> memory is something that I should consider implementing, or whether Flink 
>>> already had some built-in way of doing it.
>>> 
>>> Along the same lines, I am also wondering whether Flink already has means 
>>> of compacting the state of a window by applying an aggregation function to 
>>> the elements so-far (eg. every time window is triggered)? For example, if 
>>> you are only executing a sum on the contents of the window, the window 
>>> state doesn't need to store all the individual items in the window, it only 
>>> needs to store the sum. Aggregations other than "sum" might have that 
>>> characteristic too. I don't know if Flink is already that intelligent or 
>>> whether I should figure out how to aggregate window contents myself when 
>>> possible with something like a window fold? Another poster (Aljoscha) was 
>>> talking about adding incremental snapshots, but it sounds like that would 
>>> only improve the write throughput not the memory usage.
>>> 
>>> Thanks again!
>>> Shannon Carey
>>> 
>>> 
>>> From: Stephan Ewen 
>>> Date: Wednesday, April 6, 2016 at 10:37 PM
>>> To: 
>>> Subject: Re: State in external db (dynamodb)
>>> 
>>> Hi Shannon!
>>> 
>>> Welcome to the Flink community!
>>> 
>>> You are right, sinks need in general to be idempotent if you want 
>>> "exactly-once" semantics, because there can be a replay of elements that 
>>> were already written.
>>> 
>>> However, what you describe later, overwriting of a key with a new value (or 
>>> the same value again) is pretty much sufficient. That means that when a 
>>> 

Re: Late arriving events

2016-07-06 Thread Chen Qin
Jamie,

Sorry for late reply, some of my thoughts inline.

-Chen

>
> Another way to do this is to kick off a parallel job to do the backfill
> from the previous savepoint without stopping the current "realtime" job.
> This way you would not have to have a "blackout".  This assumes your final
> sink can tolerate having some parallel writes to it OR you have two
> different sinks and throw a switch from one to another for downstream jobs,
> etc.
>

​Sounds great to me. I think it will solve "blackout" issue I mentioned.
Sink might a bit more like read-check-write fashion but should be fine.


>
>
>>
>> In general, I don't know if there are good solutions for all of these
>> scenarios. Some might keep messages in windows longer.(messages not purged
>> yet) Some might kick off another pipeline just dealing with affected
>> windows(messages already purged). What would be suggested patterns?
>>
>
> Of course, ideally you would just keep data in windows longer such that
> you don't purge window state until you're sure there is no more data
> coming.  The problem with this approach in the real world is that you may
> be wrong with whatever time you choose ;)  I would suggest doing the best
> job possible upfront by using an appropriate watermark strategy to deal
> with most of the data.  Then process the truly late data with a separate
> path in the application code.  This "separate" path may have to deal with
> merging late data with the data that's already been written to the sink but
> this is definitely possible depending on the sink.
>

​Make sense. A truly late events should go through a side job that merge
with whatever written in sink. That might also imply both sinks able to do
read-check-merge.

e.g job doing search keyword count from begining, an outage caused some
hosts partitioned by keywords went down for couple of days. backfill job
started load and adding counts, after it backfilled all missing keywords
and merge aggregation results, it might needs to write to current yet
written windows and let main job pickup and merge results.


Late arriving events

2016-07-05 Thread Chen Qin
Hi there,

I understand Flink currently doesn't support handling late arriving events.
In reality, a exact-once link job needs to deal data missing or backfill
from time to time without rewind to previous save point, which implies
restored job suffers blackout while it tried to catch up.

In general, I don't know if there are good solutions for all of these
scenarios. Some might keep messages in windows longer.(messages not purged
yet) Some might kick off another pipeline just dealing with affected
windows(messages already purged). What would be suggested patterns?

For keeping message longer approach, we need to corrdinate backfill
messages and current on going messages assigned to windows not firing
without all of these messages. One challenge of this approach would be
determine when backfill messages all processed. Ideally there would be a
customized barrier that travel through entire topology and tell windows
backfills are done. This works both for non keyed stream and keyed stream.
I don't think link support this yet. Another way would be use session
window merge and extent window purging time with some reasonable
estimation. This approach is based on estimation and may add execution
latency to those windows.

Which would be suggested way in general?

Thanks,
Chen


Re: s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi Ufuk,

Yes, it does help with Rocksdb backend!
After tune checkpoint frequency align with network throughput, task manager 
released and job get cancelled are gone.

Chen


> On May 10, 2016, at 10:33 AM, Ufuk Celebi <u...@apache.org> wrote:
> 
>> On Tue, May 10, 2016 at 5:07 PM, Chen Qin <qinnc...@gmail.com> wrote:
>> Future, to keep large key/value space, wiki point out using rocksdb as
>> backend. My understanding is using rocksdb will write to local file systems
>> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
>> checkpoint state split yet? Or would implement kvstate interface makes flink
>> take care of large state problem?
> 
> Hey Chen,
> 
> when you use RocksDB, you only need to explicitly configure the file
> system checkpoint directory, for which you can use S3:
> 
> new RocksDBStateBackend(new URI("s3://..."))
> 
> The local disk path are configured via the general Flink temp
> directory configuration (see taskmanager.tmp.dirs in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
> default is /tmp).
> 
> State is written to the local RocksDB instance and the RocksDB files
> are copied to S3 on checkpoints.
> 
> Does this help?
> 
> – Ufuk


s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on 
heap. I can see task manager starts to fail without showing OOM exception. 
Instead, it shows a generic error message (below) when checkpoint triggered. I 
assume this has something to do with how state were kept in buffer and flush to 
s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. 
My understanding is using rocksdb will write to local file systems instead of 
sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state 
split yet? Or would implement kvstate interface makes flink take care of large 
state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. 
Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 
24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) 
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) 
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
 at 
org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at 
org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at 
org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at 
akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at 
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at 
akka.actor.ActorCell.invoke(ActorCell.scala:486) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at 
akka.dispatch.Mailbox.run(Mailbox.scala:221) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: s3 checkpointing issue

2016-05-04 Thread Chen Qin
Uruk & Igor,

Thanks for helping out!  Yup, it fixed my issue.

Chen



On Wed, May 4, 2016 at 12:57 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> I think I've had this issue too and fixed it as Ufuk suggested
> in core-site.xml
>
> something like
> 
> fs.s3a.buffer.dir
> /tmp
> 
>
>
> On 4 May 2016 at 11:10, Ufuk Celebi <u...@apache.org> wrote:
>
>> Hey Chen Qin,
>>
>> this seems to be an issue with the S3 file system. The root cause is:
>>
>>  Caused by: java.lang.NullPointerException at
>>
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>> at
>> org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>> at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>> ... 25 more
>>
>> From [1] it looks like you have to specify
>>
>> fs.s3a.buffer.dir
>>
>> in the Hadoop configuration (where you set the S3 file system).
>>
>> The expected value is a comma separated list of local directories used
>> to buffer results prior to transmitting the to S3 (for large files).
>>
>> Does this fix the issue? Please report back so that we can include in
>> the "common issues" section of the AWS docs.
>>
>> – Ufuk
>>
>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
>>
>>
>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin <qinnc...@gmail.com> wrote:
>> > Hi there,
>> >
>> > I run a test job with filestatebackend and save checkpoints on s3 (via
>> s3a)
>> >
>> > The job crash when checkpoint triggered. Looking into s3 directory and
>> list
>> > objects. I found the directory is create successfully but all
>> checkpoints
>> > directory size are empty.
>> >
>> > The host running task manager shows following error.
>> >
>> > Received error response:
>> com.amazonaws.services.s3.model.AmazonS3Exception:
>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549,
>> AWS
>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request
>> ID:x
>> >
>> > Has anyone met this issue before?
>> >
>> > flink 1.0.0
>> > scala 2.10
>> > hadoop-aws 2.7.2
>> > aws-java-sdk 1.7.4
>> >
>> >
>> > Thanks,
>> > Chen
>> >
>> > Attached full log that shows on web dashboard when job canceled.
>> > java.lang.RuntimeException: Error triggering a checkpoint as the result
>> of
>> > receiving checkpoint barrier at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
>> Could
>> > not open output stream for state backend at
>> >
>> org.apache.flink.runtime.state.files

fan out parallel-able operator sub-task beyond total slots number

2016-04-17 Thread Chen Qin
Hi there,


I try run large number of subtasks within a task slot using slot sharing
group. The usage scenario tried to adress operator that makes a network
call with high latency yet less memory or cpu footprint. (sample code below)

>From doc provided, slotsharinggroup seems the place to look at. Yet it
seems it were not designed to address the scenario above.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources

My question is, which is best way to fan out large number of sub tasking
parallel within a task?

public void testFanOut() throws Exception{
env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new
FlatMapFunction<DummyFlinkRecord, Long>() {
@Override
public void flatMap(DummyFlinkRecord dummyFlinkRecord,
Collector collector) throws Exception {
Thread.sleep(1000); //latency is high, needs to fan out
collector.collect(1l);
}
}).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new
FilterFunction() {
@Override
public boolean filter(Long aLong) throws Exception {
return true;
}
}).setParallelism(10).addSink(new SinkFunction() {
@Override
public void invoke(Long aLong) throws Exception {
System.out.println(aLong);
}
});
env.execute("fan out 100 subtasks for 1s delay mapper");
}

Thanks,
Chen Qin