Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
This in general is not a good idea, as the state you query using queryable
state within a job does not provide any consistency guarantees at all.

Would it be possible to have some trigger that emits state of the windows,
and join the states downstream?
In general, that is a better approach for what you seem to be trying to
achieve.

Otherwise, when it comes to "querying state across operators", that's a hint
where the Stateful Functions [1] model could maybe be a better fit to your
use case here. Specifically, using Stateful Functions, you would model
"querying state" as a request to the target function, with the target
function sending its state back as a response.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

That in general is not a good idea, with the problem you mentioned as well
as the fact that the state you
query within the same job using queryable state does not provide any means
of consistency guarantee.

When it comes to "querying state from another operator", it is a hint that
your use case can potentially be
better modeled using the Stateful Functions framework [1]. With Stateful
Functions, you would model this
as a request message to the target function, with the target function
replying a response carrying its state.
There are still other shortcomings though, for example StateFun currently
doesn't support windowed state yet.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html

On Thu, May 21, 2020 at 10:25 PM Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> wrote:

> Hi,
>
> So what I meant was that I have a keyed stream, and from each
> thread/keygroup/PU I want to query the state of the other
> threads/keygroups/PUs.
>
> Does anybody have any experience with this?
>
> I'm currently working on it, and the main problem seems to be that the
> Queryable State Client requires the JobID from which to query the state,
> which in my case would be the same as its own jobID. Any ideas how to
> workaround this?
> Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work.
>
> Best,
> Annemarie
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Performance impact of many open windows at the same time

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi Joe,

The main effect this should have is more state to be kept until the windows
can be fired (and state purged).
This would of course increase the time it takes to checkpoint the operator.

I'm not sure if there will be significant runtime per-record impact caused
by how windows are bookkeeped in data structures in the WindowOperator,
maybe Aljoscha (cc'ed) can chime in here for anything.
If it is certain that these windows will never fire (until far into the
future) because the event-timestamps are in the first place corrupted, then
it might make sense to have a way to drop windows based on some criteria.
I'm not sure if that is supported in any way without triggers (since you
mentioned that those windows might not receive any data), again Aljoscha
might be able to provide more info here.

Cheers,
Gordon

On Thu, May 21, 2020 at 7:02 PM Joe Malt  wrote:

> Hi all,
>
> I'm looking into what happens when messages are ingested with timestamps
> far into the future (e.g. due to corruption or a wrong clock at the sender).
>
> I'm aware of the effect on watermarking, but another thing I'm concerned
> about is the performance impact of the extra windows this will create.
>
> If a Flink operator has many (perhaps hundreds or thousands) of windows
> open but not receiving any data (and never firing), will this degrade
> performance?
>
> Thanks,
> Joe
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Are you getting an exception from running the Harness?
The Harness should already have the required configurations, such as the
parent first classloading config.

Otherwise, if you would like to add your own configuration, use the
`withConfiguration` method on the `Harness` class.

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
> 3. Is there an example on how to write Flink main for stageful function?
> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
>
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Sorry, forgot to cc user@ as well in the last reply.

On Fri, May 22, 2020 at 12:01 PM Tzu-Li (Gordon) Tai 
wrote:

> As an extra note, the utilities you will find in `statefun-e2e-tests`,
> such as the `StatefulFunctionsAppsContainers` is not yet intended for users.
> This however was previously discussed before. Would be great to hear
> feedback from you on how it works for you if you do decide to give that a
> try.
>
> On Fri, May 22, 2020 at 11:58 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>> config parameter:
>>>
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>
>>>
>>>
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>> Hi,
>>> I am trying to run
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  locally
>>> using
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>> I can use it with Kafk
>>>
>>> Could you provide some context on why you would want to do that?
>>
>> The StateFun Flink Harness was not intended to work with the usual
>> shipped ingress / egresses, but purely as a utility for users to run
>> StateFun applications in a consolidated local setup.
>> For testing against Kafka, I would suggest looking at how the StateFun
>> end-to-end tests do it, using testcontainers.
>> The tests are located under `statefun-e2e-tests` module.
>>
>> If you still want to use the Flink Harness for this, you may be able to
>> use the withFlinkSourceFunction function to directly supply the Flink Kafka
>> connector.
>> This only works for the ingress side, though.
>>
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>> Harness. Is there a way to short circuit it and have Harness get
>>> StatefulFunctionUniverse directly
>>>
>>> That is not possible. The StatefulFunctionUniverse that the Harness
>> utility provides is always a "mock" one, which contains the defined
>> in-memory ingress and egresses.
>> As previously mentioned, that is because the Flink Harness was intended
>> for running StateFun applications without the need to interact with any
>> other external systems.
>>
>>> 3. Is there an example on how to write Flink main for stageful function?
>>>
>>> At the moment, it is not possible to directly integrate Flink APIs and
>> Stateful Functions APIs in a single job.
>> What do you have in mind for what you want to achieve?
>>
>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>> with Kafka?
>>>
>>> The tests in `statefun-e2e-tests` can be run in the IDE and tests
>> against Kafka. It does require Docker to be available though.
>>
>>> 5 There is a great stateful functions example
>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>> does not really describe implementation and neither does this article,
>>> referencing it
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>> Is there anything that describes this implementation?
>>>
>>> I think the bottom half of the article provides some details of the
>> example, including the messaging between functions and a rough sketch of
>> the functions. Maybe its not detailed enough?
>> In particular, what parts of the example would you want to have more
>> details on?
>>
>> Cheers,
>> Gordon
>>
>>
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
>
> Could you provide some context on why you would want to do that?

The StateFun Flink Harness was not intended to work with the usual shipped
ingress / egresses, but purely as a utility for users to run StateFun
applications in a consolidated local setup.
For testing against Kafka, I would suggest looking at how the StateFun
end-to-end tests do it, using testcontainers.
The tests are located under `statefun-e2e-tests` module.

If you still want to use the Flink Harness for this, you may be able to use
the withFlinkSourceFunction function to directly supply the Flink Kafka
connector.
This only works for the ingress side, though.

> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
>
> That is not possible. The StatefulFunctionUniverse that the Harness
utility provides is always a "mock" one, which contains the defined
in-memory ingress and egresses.
As previously mentioned, that is because the Flink Harness was intended for
running StateFun applications without the need to interact with any other
external systems.

> 3. Is there an example on how to write Flink main for stageful function?
>
> At the moment, it is not possible to directly integrate Flink APIs and
Stateful Functions APIs in a single job.
What do you have in mind for what you want to achieve?

> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
>
> The tests in `statefun-e2e-tests` can be run in the IDE and tests against
Kafka. It does require Docker to be available though.

> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
> I think the bottom half of the article provides some details of the
example, including the messaging between functions and a rough sketch of
the functions. Maybe its not detailed enough?
In particular, what parts of the example would you want to have more
details on?

Cheers,
Gordon


Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-14 Thread Tzu-Li (Gordon) Tai
Hi Xiaolong,

You are right, the way the Kinesis connector is implemented / the way the
AWS APIs are used, does not allow it to consume Kinesis streams with
enhanced fan-out enabled consumers [1].
Could you open a JIRA ticket for this?
As far as I can tell, this could be a valuable contribution to the
connector for Kinesis users who require dedicated throughput isolated from
other running consumers.

Cheers,
Gordon

[1]
https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html

On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang 
wrote:

> Hello Flink Community!
>
>   I'm currently coding on a project relying on AWS Kinesis. With the
> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
> message.
>
>  But as the main stream is used among several other teams, I was
> required to use the enhance fanout of Kinesis. I checked the connector code
> and found no implementations.
>
>  Has this issue occurred to anyone before?
>
> Thanks for your help.
>


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-11 Thread Tzu-Li (Gordon) Tai
In that case, the most possible cause would be
https://issues.apache.org/jira/browse/FLINK-16313, which is included in
Flink 1.10.1 (to be released)

The release candidates for Flink 1.10.1 is currently ongoing, would it be
possible for you to try that out and see if the error still occurs?

On Mon, May 11, 2020 at 4:11 PM luisfaamaral 
wrote:

> Thanks Gordon and Seth for the reply.
>
> So.. the main project contains the below flink dependencies...
>
>
>
> And the state processor project contains the following:
> 1.9.0
>
>
>
> At the first sight I may say all the libraries match to 1.9.0 flink
> libraries within both projects.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Statefun 2.0 questions

2020-05-10 Thread Tzu-Li (Gordon) Tai
Hi,

Correct me if I'm wrong, but from the discussion so far it seems like what
Wouter is looking for is an HTTP-based ingress / egress.

We have been thinking about this in the past. The specifics of the
implementation is still to be discussed, but to be able to ensure
exactly-once processing semantics, behind the scenes of an HTTP-based
ingress, external messages / response will still likely be routed through
durable messaging systems such as Kafka / Pulsar / etc.

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rich Function Thread Safety

2020-05-10 Thread Tzu-Li (Gordon) Tai
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Tzu-Li (Gordon) Tai
Hi,

The last time I saw this error, was that there was a mismatch in the used
flink-state-processor-api version and other core Flink dependencies.
Could you confirm that?

Also, are you seeing this assertion error consistently, or only
occasionally?
cc'ing Seth, maybe he has other clues on the cause.

Cheers,
Gordon

On Fri, May 8, 2020 at 3:06 PM luisfaamaral 
wrote:

> No one? :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Benchmark for Stateful Functions

2020-05-04 Thread Tzu-Li (Gordon) Tai
Hi Omid,

There currently aren't any benchmarks that I know of for Stateful Functions.

However, Stateful Functions applications run on top of Apache Flink and
therefore share the same network stack / runtime. So, if throughput and
latency is your only concern, you should be able carry over any results from
Flink.

What in particular are you interested in?
As for any benchmark, it'd only be useful with a specific use case /
scenario in mind.
It would be interesting to hear what you have in mind.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about EventTimeTrigger

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

Could you briefly describe what you are trying to achieve?

By definition, a GlobalWindow includes all data - the ending timestamp for
these windows are therefore Long.MAX_VALUE. An event time trigger wouldn't
make sense here, since that trigger would never fire (watermark can not pass
the end timestamp of a GlobalWindow).

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Stateful Functions] Using Flink CEP

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi!

It isn't possible to use Flink CEP within Stateful Functions.

That could be an interesting primitive, to add CEP-based function
constructs.
Could your briefly describe what you are trying to achieve?

On the other hand, there are plans to integrate Stateful Functions more
closely with the Flink APIs.
One direction we've been thinking about is to, for example, support Flink
DataStreams as StateFun ingress / egresses. In this case, you'll be able to
use Flink CEP to detect patterns, and use the results as an ingress which
invokes functions within a StateFun app.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StateFun - Multiple modules example

2020-04-08 Thread Tzu-Li (Gordon) Tai
Hi Oytun!

You can see here an example of how to package a StateFun application image
that contains multiple modules:
https://ci.apache.org/projects/flink/flink-statefun-docs-stable/deployment-and-operations/packaging.html#images

Essentially, for each module you want to include in your application, you
add the jar containing the service file (for embedded JVM functions)
or the YAML module definition file (for remote functions) under the
"/opt/statefun/modules" directory of the packaged image.

Cheers,
Gordon


On Thu, Apr 9, 2020 at 7:43 AM Oytun Tez  wrote:

> Hi there,
>
> Does anyone have any statefun 2.0 examples with multiple modules?
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


[ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.0.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub.
Dockerfiles for this release can be found at:
https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
Progress for creating the Docker Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: state schema evolution for case classes

2020-04-02 Thread Tzu-Li (Gordon) Tai
Hi,

I see the problem that you are bumping into as the following:
- In your previous job, you seem to be falling back to Kryo for the state
serialization.
- In your new job, you are trying to change that to use a custom serializer.

You can confirm this by looking at the stack trace of the "new state
serializer is not compatible" exception.
Could you maybe post me the stack trace?

If that is indeed the case, I'm afraid that right now, you'll only be able
to resolve this using the State Processor API to migrate this serializer
offline.
The reason for this is as follows:
- In order to perform the serializer migration at restore time, Flink needs
to understand that the old serializer (i.e. the Kryo serializer in your
case) is compatible with the new serializer (the custom TestDataNested
serializer in your case).
- Currently, Flink cannot reason about serializer compatibility when the
serializer changes completely to a different class than before. Therefore,
if the serializer class changes, to be safe, right now Flink always assume
that the new serializer is not compatible and therefore fails the restore.

You can manually force this migration offline, as I said using the State
Processor API:
- The steps would be to load the previous savepoint, and when reading your
`testdata-join` state values, use the previous way of providing the
serializer (i.e. classOf[TestDataNested]).
- Then, bootstrap a new savepoint with the state values read from
`testdata-join`. You may use whatever new serializer you want to write the
state into the new savepoint.

As a side note, I have been thinking about two options that allows a easier
path for users to do this:
Option #1: The Kryo serializer should assume that new serializers are
always compatible, given that the target serialized classes are the same
(which is true for your case). This allows users to opt-out of Kryo
serialization, which has always just been a fallback that many users did
not realize they were using when Flink cannot interpret the state type.
Option #2: Maybe add a "force-migration" option when restoring from
savepoints. This would essentially be an online version of the State
Processor API process I explained above, but instead of happening offline,
the migration would happen at restore from savepoints.

TL;DR: for now, I would suggest to try using the State Processor API to
migrate the serializer for your specific case.

Cheers,
Gordon

On Thu, Apr 2, 2020 at 11:14 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Hi Gordon,
>
> thanks for your response , So I have done a POC on state migration using
> avro, it seems it works out well.
>
> I am using custom avro serializer (with avro schema and (TypeSerializer,
> TypeSerializerSnapshot) and based on that written my own custom
> serializer for the scala case class that I am serialising (I am using
> rocksdb as statedbackend).
>
> So when I am evolve the class with any datatype I just change avsc(avro
> schema json) and give old schema as well as new schema to serialise data
> already in rocksDB to read and accordingly write it with new and it works
> just fine. So I can add new class to my application supporting schema
> evolution,
>
> I have define state like this :
>
>
> private[this] lazy val stateDescriptorTest: 
> ValueStateDescriptor[TestDataNested] =
>   new ValueStateDescriptor[TestDataNested]("testdata-join", 
> TestDataNested.serializer)
> private[this] lazy val stateTest: ValueState[TestDataNested] = 
> getRuntimeContext.getState(stateDescriptorTest)
>
>
> Now the problem with the existing class in my current application we have 
> define state as follow (for example):
>
>
> private[this] lazy val stateDescriptorTest: 
> ValueStateDescriptor[TestDataNested] =
>   new ValueStateDescriptor[TestDataNested]("testdata-join", 
> classOf[TestDataNested])
> private[this] lazy val stateTest: ValueState[TestDataNested] = 
> getRuntimeContext.getState(stateDescriptorTest)
>
> So when I provide TestDataNested.serializer  Instead of  
> "classOf[TestDataNested]"
> in my current application, basically replace the serialise it throws the
> "new state serialiser is not compaitable.
>
> What can I do here, would be great help thanks in advance
>
> On Fri, Mar 27, 2020 at 1:19 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Apoorv,
>>
>> Sorry for the late reply, have been quite busy with backlog items the
>> past days.
>>
>> On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Thanks Gordon for the suggestion,
>>>
>>> I am going by this repo :
>>> https://github.com/mrooding/flink-avro-state-serialization
>>>
>>> So far I am able to alter the scala case cla

Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
Thanks! Looking forward to that.

On Tue, Mar 31, 2020 at 1:02 AM Mark Niehe  wrote:

> Hi Gordan and Seth,
>
> Thanks for explanation and opening up the ticket. I'll add some details in
> the ticket to explain what we're trying to do which will hopefully add some
> context.
>
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature_medium=email>  ·  
> We're
> Hiring! <https://segment.com/jobs?utm_source=signature_medium=email>
>
> On Mon, Mar 30, 2020 at 1:04 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> It seems like Seth's reply didn't make it to the mailing lists somehow.
>> Forwarding his reply below:
>>
>> -- Forwarded message -
>> From: Seth Wiesman 
>> Date: Thu, Mar 26, 2020 at 5:16 AM
>> Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
>> To: Dawid Wysakowicz 
>> Cc: , Tzu-Li (Gordon) Tai 
>>
>>
>> As Dawid mentioned, you can implement your own operator using the
>> transform method to do this yourself. Unfortunately, that is fairly low
>> level and would require you to understand some flink amount internals.
>>
>> The real problem is that the state processor api does not support two
>> input operators. We originally skipped that because there were a number of
>> open questions about how best to do it and it wasn't clear that it would be
>> a necessary feature. Typically, flink users use two input operators to do
>> some sort of join. And when bootstrapping state, you typically only want to
>> pre-fill one side of that join. KeyedBroadcastState is clearly a good
>> counter-argument to that.
>>
>> I've opened a ticket for the feature if you would like to comment there.
>>
>> https://issues.apache.org/jira/browse/FLINK-16784
>>
>> On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> I am not very familiar with the State Processor API, but from a brief
>>> look at it, I think you are right. I think the State Processor API does not
>>> support mixing different kinds of states in a single operator for now. At
>>> least not in a nice way. Probably you could implement the
>>> KeyedBroadcastStateBootstrapFunction yourself and us it with
>>> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
>>> I understand this is probably not the easiest task.
>>>
>>> I am not aware if there are plans to support that out of the box, but I
>>> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
>>> hope they might give you some more insights.
>>>
>>> Best,
>>>
>>> Dawid
>>>  On 23/03/2020 17:36, Mark Niehe wrote:
>>>
>>> Hey all,
>>>
>>> I have another question about the State Processor API. I can't seem to
>>> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
>>> two options currently available to bootstrap a savepoint with state are
>>> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
>>> these are the only two options, it's not possible to bootstrap both keyed
>>> and broadcast state for the same operator. Are there any plans to add that
>>> functionality or did I miss it entirely when going through the API docs?
>>>
>>> Thanks,
>>> --
>>> <http://segment.com/>
>>> Mark Niehe ·  Software Engineer
>>> Integrations
>>> <https://segment.com/catalog?utm_source=signature_medium=email>  ·
>>> Blog <https://segment.com/blog?utm_source=signature_medium=email>
>>>   ·  We're Hiring!
>>> <https://segment.com/jobs?utm_source=signature_medium=email>
>>>
>>>


Fwd: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
It seems like Seth's reply didn't make it to the mailing lists somehow.
Forwarding his reply below:

-- Forwarded message -
From: Seth Wiesman 
Date: Thu, Mar 26, 2020 at 5:16 AM
Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
To: Dawid Wysakowicz 
Cc: , Tzu-Li (Gordon) Tai 


As Dawid mentioned, you can implement your own operator using the transform
method to do this yourself. Unfortunately, that is fairly low level and
would require you to understand some flink amount internals.

The real problem is that the state processor api does not support two input
operators. We originally skipped that because there were a number of open
questions about how best to do it and it wasn't clear that it would be a
necessary feature. Typically, flink users use two input operators to do
some sort of join. And when bootstrapping state, you typically only want to
pre-fill one side of that join. KeyedBroadcastState is clearly a good
counter-argument to that.

I've opened a ticket for the feature if you would like to comment there.

https://issues.apache.org/jira/browse/FLINK-16784

On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> I am not very familiar with the State Processor API, but from a brief look
> at it, I think you are right. I think the State Processor API does not
> support mixing different kinds of states in a single operator for now. At
> least not in a nice way. Probably you could implement the
> KeyedBroadcastStateBootstrapFunction yourself and us it with
> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
> I understand this is probably not the easiest task.
>
> I am not aware if there are plans to support that out of the box, but I
> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
> hope they might give you some more insights.
>
> Best,
>
> Dawid
>  On 23/03/2020 17:36, Mark Niehe wrote:
>
> Hey all,
>
> I have another question about the State Processor API. I can't seem to
> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
> two options currently available to bootstrap a savepoint with state are
> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
> these are the only two options, it's not possible to bootstrap both keyed
> and broadcast state for the same operator. Are there any plans to add that
> functionality or did I miss it entirely when going through the API docs?
>
> Thanks,
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature_medium=email>  ·  
> We're
> Hiring! <https://segment.com/jobs?utm_source=signature_medium=email>
>
>


Re: state schema evolution for case classes

2020-03-27 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past
days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks Gordon for the suggestion,
>
> I am going by this repo :
> https://github.com/mrooding/flink-avro-state-serialization
>
> So far I am able to alter the scala case classes and able to restore from
> savepoint using memory state backend, but when I am using rocksdb as
> statebackend and try to restore from savepoint it break with following
> error :
>

When you say restoring it with the RocksDB backend, was the savepoint you
are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend
across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load
an existing savepoint, you first have to load it with the same state
backend that was used to create the savepoint. You can change the state
backend using the State Processor API, by creating a new savepoint with
your desired target backend, and dumping all state data extracted from the
loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint
formats which would make a lot of this easier, but AFAIK this isn't on the
roadmap in the near future.

Best Regards,
Gordon

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


>
> org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from 
> RocksDB.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
>   at 
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
>   at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
>   at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
>   at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
>   at 
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>   at 
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at 
> nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
>   ... 8 more
>
>
>
>
> On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Apoorv,
>>
>> Flink currently does not natively support schema evolution for state
>> types using Scala case classes [1].
>>
>> So, as Roman has pointed out, there are 2 possible ways for you to do
>> that:
>> - Implementing a custom serializer that support schema evolution for your
>> specific Scala case classes, as Roman suggested.
>> - or, using the State Processor API [2] to migrate your case classes
>> offline as a batch job
>>
>> For your question on how to implement a schema-evolution supporting
>> serializer, can you share wi

Re: state schema evolution for case classes

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Flink currently does not natively support schema evolution for state types
using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your
specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes
offline as a batch job

For your question on how to implement a schema-evolution supporting
serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that
would be a starting point to implement something similar for your case
classes.

As you will quickly realize, it's not simple, so I would strongly suggest
trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks a lot , Also can you share one example where these has been
> implemented? I have gone through docs does not happen to work still
>
> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Apoorv,
>>
>> You can achieve this by implementing custom serializers for your state.
>> Please refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Hi Roman,
>>>
>>> I have successfully migrated to flink 1.8.2 with the savepoint created
>>> by flink 1.6.2.
>>> Now I have to modify few case classes due to new requirement I have
>>> created a savepoint and when I run the app with modified class from the
>>> savepoint it throws error "state not compatible"
>>> Previously there were no serializer used.
>>> I now wish to support state schema Hence need suggestion how can i
>>> achieve that ?
>>>
>>> Regards
>>>
>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi ApoorvK,

 I understand that you have a savepoint created by Flink 1.6.2 and you
 want to use it with Flink 1.8.2. The classes themselves weren't modified.
 Is that correct?
 Which serializer did you use?

 Regards,
 Roman


 On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
 wrote:

> Hi Team,
>
> Earlier we have developed on flink 1.6.2 , So there are lots of case
> classes
> which have Map,Nested case class within them for example below :
>
> case class MyCaseClass(var a: Boolean,
>  var b: Boolean,
>  var c: Boolean,
>  var d: NestedCaseClass,
>  var e:Int){
> def this(){this(false,false,new NestedCaseClass,0)}
> }
>
>
> Now we have migrated to flink 1.8.2 , I need help to figure out how
> can I
> achieve state schema evolution for such classes.
>
> 1. Is creating avro for these classes now, and implement avro
> serialisation
> will that work ?
> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>
> Please suggest what can be done here, or redirect for the avros
> serialisation example.
>
> Thanks
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



Re: Help me understand this Exception

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

The exception stack you posted simply means that the next operator in the
chain failed to process the output watermark.
There should be another exception, which would explain why some operator
was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:

> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> LOGGER.info("timestamp", timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
> }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
> at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
> at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:128)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:784)
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:213)
> ... 10 more
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

I believe that the title of this email thread was a typo, and should be
"Apache Flink - Question about checkpointing and re-run a job."
I assume this because the contents of the previous conversations seem to be
purely about Flink.

Otherwise, as far as I know, there doesn't seem to be any publicly available
Airflow operators for Flink right now.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

As David already explained, they are similar in that you may output zero to
multiple records for both process and flatMap functions.

However, ProcessFunctions also expose to the user much more powerful
functionality, such as registering timers, outputting to side outputs, etc.

Cheers,
Gordon




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Very large _metadata file

2020-03-04 Thread Tzu-Li (Gordon) Tai
Hi Jacob,

Apart from what Klou already mentioned, one slightly possible reason:

If you are using the FsStateBackend, it is also possible that your state is
small enough to be considered to be stored inline within the metadata file.
That is governed by the "state.backend.fs.memory-threshold" configuration,
with a default value of 1024 bytes, or can also be configured with the
`fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
The purpose of that threshold is to ensure that the backend does not create
a large amount of very small files, where potentially the file pointers are
actually larger than the state itself.

Cheers,
Gordon



On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:

> Hi Jacob,
>
> Could you specify which StateBackend you are using?
>
> The reason I am asking is that, from the documentation in [1]:
>
> "Note that if you use the MemoryStateBackend, metadata and savepoint
> state will be stored in the _metadata file. Since it is
> self-contained, you may move the file and restore from any location."
>
> I am also cc'ing Gordon who may know a bit more about state formats.
>
> I hope this helps,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>
> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
> >
> > Per the documentation:
> >
> > "The meta data file of a Savepoint contains (primarily) pointers to all
> files on stable storage that are part of the Savepoint, in form of absolute
> paths."
> >
> > I somehow have a _metadata file that's 1.9GB. Running strings on it I
> find 962 strings, most of which look like HDFS paths, which leaves a lot of
> that file-size unexplained. What else is in there, and how exactly could
> this be happening?
> >
> > We're running 1.6.
> >
> > Jacob
>


Re: what is the hash function that Flink creates the UID?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

Flink currently performs a 128-bit murmur hash on the user-provided uids to
generate the final node hashes in the stream graph. Specifically, this
library is being used [1] as the hash function.

If what you are looking for is for Flink to use exactly the provided hash,
you can use `setUidHash` for that - Flink will use that provided uid hash as
is for the generated node hashes.
However, that was exposed as a means for manual workarounds to allow for
backwards compatibility in legacy breaking cases, so it is not advised to
use that in your case.

BR,
Gordon

[1]
https://guava.dev/releases/19.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

What that LOG means (i.e. "must be processed as a Generic Type") is that
Flink will have to fallback to using Kryo for the serialization for that
type.

You should be concerned about that if:
1) That type is being used for some persisted state in snapshots. That would
be the case if you've registered state of that type, or is used as the input
for some built-in operator that persists input records in state (e.g. window
operators). Kryo generally does not have a friendly schema evolution story,
so you would want to avoid that going into production.
2) Kryo itself is not the fastest compared to Flink's POJO serializer, so
that would be something to consider as well even if the type is only used
for transient, on-wire data.

I think in your case, since your POJO contains an inner field that cannot be
recognized as a POJO (i.e. the LocalDateTime), then your outer class is also
not recognized as a POJO.

BR,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Kaymak,

To answer your last question:
there will be no data loss in that scenario you described, but there could
be duplicate processed records.

With checkpointing enabled, the Flink Kafka consumer does not commit
offsets back to Kafka until offsets in Flink checkpoints have been
persisted.

That external offset commit, however, is not guaranteed to happen, and
always "lag" behind the offsets maintained internally in Flink checkpoints.
That is the reason for why there may be duplicate consumed records if you
rely on those on startup, instead of the offsets maintained within Flink.

The rule of thumb is:
Committed offsets back to Kafka by the Flink Kafka consumer is only a means
to expose progress to the outside world,
and there is no guarantee that those committed offsets are consistent with
operator states in the streaming job.

BR,
Gordon


On Mon, Mar 2, 2020, 11:18 PM Kaymak, Tobias 
wrote:

> Thank you! One last question regarding Gordons response. When a pipeline
> stops consuming and cleanly shuts down and there is no error during that
> process, and then it gets started again and uses the last committed offset
> in Kafka - there should be no data loss - or am I missing something?
>
> In what scenario should I expect a data loss? (I can only think of the
> jobmanager or taskmanager getting killed before the shutdown is done.)
>
> Best,
> Tobi
>
> On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Sorry for my previous slightly confusing response, please take a look at
>> the response from Gordon.
>>
>> Piotrek
>>
>> On 2 Mar 2020, at 12:05, Kaymak, Tobias  wrote:
>>
>> Hi,
>>
>> let me refine my question: My pipeline is generated from Beam, so the
>> Flink pipeline is a translated Beam pipeline. When I update my Apache Beam
>> pipeline code, working with a snapshot in Flink to stop the pipeline is not
>> an option, as the snapshot will use the old representation of the the Flink
>> pipeline when resuming from that snapshot.
>>
>> Meaning that I am looking for a way to drain the pipeline cleanly and
>> using the last committed offset in Kafka to resume processing after I
>> started it again (launching it through Beam will regenerate the Flink
>> pipeline and it should resume at the offset where it left of, that is the
>> latest committed offset in Kafka).
>>
>> Can this be achieved with a cancel or stop of the Flink pipeline?
>>
>> Best,
>> Tobias
>>
>> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Tobi,
>>>
>>> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
>>> recovery. Offsets where to start from are stored in the checkpoint itself.
>>> Updating the offsets back to Kafka is an optional, purely cosmetic thing
>>> from the Flink’s perspective, so the job will start from the correct
>>> offsets.
>>>
>>> However, if you for whatever the reason re-start the job from a
>>> savepoint/checkpoint that’s not the latest one, this will violate
>>> exactly-once guarantees - there will be some duplicated records committed
>>> two times in the sinks, as simply some records would be processed and
>>> committed twice. Committing happens on checkpoint, so if you are recovering
>>> to some previous checkpoint, there is nothing Flink can do - some records
>>> were already committed before.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2020, at 10:12, Kaymak, Tobias 
>>> wrote:
>>>
>>> Thank you Piotr!
>>>
>>> One last question - let's assume my source is a Kafka topic - if I stop
>>> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
>>> when restarting my job - the job would continue from the last offset that
>>> has been committed in Kafka and thus I would also not experience a loss of
>>> data in my sink. Is that correct?
>>>
>>> Best,
>>> Tobi
>>>
>>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski 
>>> wrote:
>>>
 Yes, that’s correct. There shouldn’t be any data loss. Stop with
 savepoint is a solution to make sure, that if you are stopping a job
 (either permanently or temporarily) that all of the results are
 published/committed to external systems before you actually stop the job.

 If you just cancel/kill/crash a job, in some rare cases (if a
 checkpoint was completing at the time cluster was crashing), some records
 might not be committed before the cancellation/kill/crash happened. Also
 note that doesn’t mean there is a data loss, just those records will be
 published once you restore your job from a checkpoint. If you want to stop
 the job permanently, that might not happen, hence we need stop with
 savepoint.

 Piotrek

 On 28 Feb 2020, at 15:02, Kaymak, Tobias 
 wrote:

 Thank you! For understanding the matter: When I have a streaming
 pipeline (reading from Kafka, writing somewhere) and I click "cancel" and
 after that I restart the pipeline - I should not expect any data to be lost
 - is that correct?

 Best,
 Tobias

 On 

Re: Flink on AWS - ActiveMQ connector

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

The connectors that are listed in the AWS documentation page that you
referenced are not provided by AWS. They are bundled connectors shipped by
the Apache Flink community as part of official Flink releases, and are
discoverable as artifacts from the Maven central repository. See the
respective Flink connector documentation pages (for example [1] for Flink's
Apache Kafka connector) on how to use those connectors in your jobs.

As for the ActiveMQ connector provided by Apache Bahir, there's also a Maven
artifact for that shipped by Apache Bahir [2].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How is state stored in rocksdb?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

First of all, state is only managed by Flink (and therefore Flink's state
backends) if the state is registered by the user.
You can take a look at the documents here [1] on details on how to register
state.
A state has to be registered for it to be persisted in checkpoints /
savepoints, and be fault-tolerant across Flink job restarts.

As for your second part of your question on serialization:
Once you take a look at how to register state to be managed by Flink, you'd
quickly realize that you can specify the serializer for registered state. If
you simply provide the class of the state data type, then Flink will use its
own type extraction to figure out the serializer to use for the type. Please
see [2] for details on that. Otherwise, a custom serializer implementation
can be provided.
In general, you can find quite a bit about state serialization here [3].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Correct way to e2e test a Flink application?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Laurent,

You can take a look at Flink's MiniClusterResource JUnit test rule, and its
usages in the codebase for that.
The rule launches a Flink MiniCluster within the same JVM, and submission to
the mini cluster resembles how it would be submitting to an actual Flink
cluster, so you would already be able to catch problems such as operator
serialization errors.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Writing a POJO Schema Evolution E2E test in Java

2020-02-20 Thread Tzu-Li (Gordon) Tai
Hi Theo,

This is indeed a tricky feature to test!

On Thu, Feb 20, 2020 at 8:59 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
> We have a pipeline which internally uses Java POJOs and also needs to keep
> some events entirely in state for some time.
>
> From time to time, our POJOs evolve, like attributes are added or removed.
>
> Now I wanted to write a E2E test that proves the schema migration works
> (Having different schemas in source kafka topic, flink pipeline state and
> sink) for bounded scenarios (attribute added or removed)
>
> I figured out that in my test, I can instantiate a
> MiniClusterWithClientResource, receive a client, start a job over the
> client and also cancel the job with a savepoint. My idea was to start the
> job, put some records in, cancel with a savepoint and restart the job from
> savepoint, but with a slightly different POJO (added another attribute and
> removed an existing one).
>
> Currently, I'm sadly missing two pieces:
> 1. I don't see a way to restart a job from savepoint via the client
> obtained from the MiniClusterWithClientResource in my test
> 2. According to a flink blog post [1],schema evolution of POJOs is more
> limited, especially the evolved POJO must have the same "nampesacpe" (i.e.
> java package?!) and class name.
>

The way this is sort of overcome by tests in Flink also surrounding schema
/ serializer evolution is to have two different classes (with different
classnames) and reload it in new classloaders so that they can be
"relocated" to have the same names at runtime.
In Flink, we use a `ClassRelocator` utility to do this. You can check out
example usages of it in the `PojoSerializerUpgradeTest` and
`TypeSerializerUpgradeTestBase`.

I'm not entirely sure if it would work in your scenario, but I think it's
worth giving it a try since it'll make writing such tests easier.

If this doesn't work, then you could try doing it such that you have
separate modules (i.e. jars) for the old / new Pojo definition, and then a
separate module that does the actual test logic while loading the jars
containing the old / new Pojos with different classloaders.
That would resemble what happens in reality more closely.


> Especially point 2 seems to make it impossible for me to automate testing
> of the evolution, but need to do it manually.
>
> Do you have any idea how I could overcome these limitations so that I can
> build a proper end to end test for the schema migration to work?
>
> Best regards
> Theo
>
> [1]
> https://flink.apache.org/news/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink.html
>

Hope that helps! Would be great to hear back from you on how it works out.

Cheers,
Gordon


Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
There might be a possible workaround for this, for now:

Basically, the trick is to explicitly tell the State Processor API to use a
specified type information to access the keyed state.
You can do that with the `ExistingSavepoint#readKeyedState(String uid,
KeyedStateReaderFunction function, TypeInformation keyTypeInfo,
TypeInformation outTypeInfo)`.
This would allow the State Processor API to bypass the Java type
information extraction process (which is not compatible with how it is done
in Scala DataStream right now, hence the StateMigrationException you are
getting).

What you'd have to do, is in your pipeline job, explicitly generate the
serializer / type information using either the Scala DataStream macro
`createTypeInformation` or just use a custom serializer.
Then, specify to use that serializer / type info when reading keyed state
with the State Processor API.
Simply put: you'll be specifying explicitly what serializer to use for the
keys, and tell the State Processor API to also use that serializer to
access state.

This is not nice, but should work for now. Would be interesting to hear how
that works out for you.
As mentioned above, eventually a possible ideal solution is that type
information extraction should be converged for the Java / Scala DataStream
APIs.

Cheers,
Gordon

On Wed, Feb 19, 2020 at 10:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Just to clarify -
> I quickly went through the README of the project, and saw this:
> "This error is seen after trying to read from a savepoint that was created
> using the same case class as a key."
>
> So, if I understood correctly, you were attempting to use the State
> Processor API to access a savepoint that was written with a Scala
> DataStream job, correct?
>
> If that's the case, I'm afraid this would not work as of now. See [1] for
> a similar scenario that others had also bumped into.
> TL;DR is - the State Processor API currently is not guaranteed to work for
> snapshots that are written with Scala DataStream jobs.
>
> For now, I'll add a big warning about this to the docs.
> But in general, it seems like we might want to consider bumping up the
> priority for enabling this, as quite a few users are using the Scala
> DataStream API for their jobs.
>
> Just as a side comment: this repo looks like a very interesting project!
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-15719
>
> On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:
>
>> Hey all,
>>
>> I've run into an issue with the State Processor API. To highlight the
>> issues I've been having, I've created a reference repository that will
>> demonstrate the issue (repository:
>> https://github.com/segmentio/flink-state-management).
>>
>> The current implementation of the pipeline has left us with keyed state
>> that we no longer need, and we don't have references some of the old keys.
>> My plan was to:
>> 1. create a savepoint
>> 2. read the keys from each operator (using State Processor API)
>> 3. filter out all the keys that are longer used
>> 4. bootstrap a new savepoint that contains the filtered state
>>
>> I managed to get this working using a sample pipeline and a very basic
>> key (a string), but when I switched the key to be something more complex (a
>> case class of two strings), I started seeing this exception:
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>> at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
>> ... 13 more
>>
>> Has anyone come across this before and figured out a fix? Any help you
>> can give would be greatly appreciated!
>>
>> Thanks,
>> --
>> <http://segment.com/>
>> Mark Niehe ·  Software Engineer
>> Integrations
>> <https://segment.com/catalog?utm_source=signature_medium=email>  ·
>> Blog <https://segment.com/blog?utm_source=signature_medium=email>
>>   ·  We're Hiring!
>> <https://segment.com/jobs?utm_source=signature_medium=email>
>>
>


Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
Hi,

Just to clarify -
I quickly went through the README of the project, and saw this:
"This error is seen after trying to read from a savepoint that was created
using the same case class as a key."

So, if I understood correctly, you were attempting to use the State
Processor API to access a savepoint that was written with a Scala
DataStream job, correct?

If that's the case, I'm afraid this would not work as of now. See [1] for a
similar scenario that others had also bumped into.
TL;DR is - the State Processor API currently is not guaranteed to work for
snapshots that are written with Scala DataStream jobs.

For now, I'll add a big warning about this to the docs.
But in general, it seems like we might want to consider bumping up the
priority for enabling this, as quite a few users are using the Scala
DataStream API for their jobs.

Just as a side comment: this repo looks like a very interesting project!

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-15719

On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:

> Hey all,
>
> I've run into an issue with the State Processor API. To highlight the
> issues I've been having, I've created a reference repository that will
> demonstrate the issue (repository:
> https://github.com/segmentio/flink-state-management).
>
> The current implementation of the pipeline has left us with keyed state
> that we no longer need, and we don't have references some of the old keys.
> My plan was to:
> 1. create a savepoint
> 2. read the keys from each operator (using State Processor API)
> 3. filter out all the keys that are longer used
> 4. bootstrap a new savepoint that contains the filtered state
>
> I managed to get this working using a sample pipeline and a very basic key
> (a string), but when I switched the key to be something more complex (a
> case class of two strings), I started seeing this exception:
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
> at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 13 more
>
> Has anyone come across this before and figured out a fix? Any help you can
> give would be greatly appreciated!
>
> Thanks,
> --
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·
> Blog   ·  
> We're
> Hiring! 
>


Re: Issue with committing Kafka offsets

2020-01-31 Thread Tzu-Li (Gordon) Tai
Hi,

There are no upper limits on the number of Kafka consumers per job.

For each one of your FlinkKafkaConsumers, are you using the same group.id?
That could maybe explain why you are experiencing higher commit times as
you are adding more FlinkKafkaConsumers, as AFAIK on the broker side, the
commit operations for the same consumer group are enqueued together.

As a side note, as the warning message already mentions, this does not
affect Flink's exactly-once guarantees.
If the only reason that you want to commit the offsets back to Kafka is to
have a way to monitor progress, it should be fine to define different
consumer group ids for each FlinkKafkaConsumer.

Hope this helps,
Gordon

On Sat, Feb 1, 2020 at 12:54 AM RKandoji  wrote:

> Can someone please help me here.
>
> Thanks
> RK
>
>
> On Thu, Jan 30, 2020 at 7:51 PM RKandoji  wrote:
>
>> Hi Team,
>>
>> I'm running into strange issue pasted below:
>>
>> Committing offsets to Kafka takes longer than the checkpoint interval.
>> Skipping commit of previous offsets because newer complete checkpoint
>> offsets are available. This does not compromise Flink's checkpoint
>> integrity.
>>
>>
>> I read data from more than 10 different Kafka topics, I started noticing
>> this issue as I integrate more number of Kafkaconsumer reading from
>> respective topics.
>>
>> Wondering if there is any upper limit on the number of Kafka consumers
>> (Kafka topics) per job?
>>
>> If not could someone please shed some light on why this could be
>> happening?
>>
>> Thanks,
>> RK
>>
>


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-30 Thread Tzu-Li (Gordon) Tai
Update:
I can confirm my previous guess based on the changes in
https://issues.apache.org/jira/browse/FLINK-4280 that was merged for Flink
1.3.0.
When upgrading from Flink 1.2.x -> 1.3.0, the new startup position
configurations were respected over the checkpointed offsets (only once for
the first restore after upgrade).
After that, all restores from savepoints would only ever respect the
checkpointed offsets (regardless of whether or not it was the first restore
after upgrade).
This would explain the behaviour you encountered.

If you actually prefer to not have your Kafka consumer progress carried
over after the upgrade and want to just start consuming from the latest
offset,
one way to achieve that is to assign a new uid to the Kafka consumer
operator, and allow non-restored state when restoring.
With this change, Flink should consider the Kafka consumer operator to not
have any prior snapshotted state (i.e. offsets) and respect the startup
configuration.

Let me know if this works for you!

Cheers,
Gordon

On Thu, Jan 23, 2020 at 9:12 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Somya,
>
> I'll have to take a closer look at the JIRA history to refresh my memory
> on potential past changes that caused this.
>
> My first suspection is this:
> It is expected that the Kafka consumer will *ignore* the configured
> startup position if the job was restored from a savepoint.
> It will always use the offsets that were persisted at the time of the
> savepoint.
> Would this probably already explain what you are seeing?
>
> What I'm not sure of yet is whether this was a behavioural change that
> occurred between versions 1.2.x and 1.3.x or later versions.
> I'll take a closer look once I'm back from travelling tomorrow and get
> back to you on that.
>
> Cheers,
> Gordon
>
> On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:
>
>> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
>> consumer after 1.3.3?
>>
>> On 23/01/2020 12:02, Somya Maithani wrote:
>>
>> Hey,
>>
>> Any ideas about this? We are blocked on the upgrade because we want async
>> timer checkpointing.
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>>
>> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <
>> somyamaithan...@gmail.com> wrote:
>>
>>> Hey Team,
>>>
>>> *Problem*
>>> Recently, we were trying to upgrade Flink infrastructure to version
>>> 1.9.1 and we noticed that a week old offset was consumed from Kafka even
>>> though the configuration says latest.
>>>
>>> *Pretext*
>>> 1. Our current Flink version in production is 1.2.1.
>>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>>> 3. We consume and produce messages to / from Kafka.
>>>
>>> *Release Plan*
>>> 1. Upgrade Flink 1.2.1 to 1.3.
>>> 2. Upgrade Flink 1.3.3 to 1.9.1
>>> Note: We have a transitioning version (1.3.3) because of the
>>> serialisation change in checkpointing.
>>>
>>> After performing step 1, the service was consuming latest Kafka events
>>> but after performing step 2 we noticed that the service was consuming one
>>> week old Kafka messages from the source topic. We did not see any
>>> exceptions but since the number of messages consumed increased a lot for
>>> our Flink infrastructure, our task managers started crashing eventually.
>>>
>>> We did not change Kafka configuration in the service for the upgrade but
>>> we did upgrade the Flink dependencies for Kafka.
>>>
>>> Old dependency:
>>>
>>> 
>>>>   org.apache.flink
>>>>   flink-streaming-java_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-clients_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-connector-kafka-0.10_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-statebackend-rocksdb_2.10
>>>>   ${flink.version}
>>>> 
>>>>
>>>
>>>
>>> New dependency:
>>>
>>> 
>>>>   org.apache.flink
>>>>   flink-streaming-java_2.12
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-clients_2.12
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-connector-kafka-0.10_2.11
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-statebackend-rocksdb_2.12
>>>>   ${flink.version}
>>>> 
>>>>
>>>
>>>
>>> Do we know why this would be happening?
>>>
>>> Regards,
>>>
>>> Somya Maithani
>>> Software Developer II
>>> Helpshift Pvt Ltd
>>>
>>
>>


Re: fliter and flatMap operation VS only a flatMap operation

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi,

If your filter and flatMap operators are chained, then the performance
difference should not be noticeable.
If a shuffle (i.e. a keyBy operation) occurs after the filter and before
the flatMap, then applying the filter first will be more efficient.

Cheers,
Gordon

On Thu, Jan 30, 2020 at 4:03 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> In case we need to filter operation followed by a transformation, which
> one is more efficient in Flink, applying the filter operation first and
> then a flatMap operation separately OR using only a flatMap operation that
> internally includes the filter logic, too?
>
> best
> Soheil
>


Re: FsStateBackend vs RocksDBStateBackend

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang  wrote:

> Hi all,
>
> We have a Flink app that uses a KeyedProcessFunction, and in the function
> it requires a ValueState(of TreeSet) and the processElement method needs to
> access and update it. We tried to use RocksDB as our stateBackend but the
> performance is not good, and intuitively we think it was because of the
> serialization / deserialization on each processElement call.
>

As you have already pointed out, serialization behaviour is a major
difference between the 2 state backends, and will directly impact
performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use
MapState instead of ValueState where possible, since every access to the
ValueState in the RocksDB backend requires serializing / deserializing the
whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this
makes sense would of course depend on your state access pattern.


> Then we tried to switch to use FsStateBackend (which keeps the in-flight
> data in the TaskManager’s memory according to doc), and it could resolve
> the performance issue. *So we want to understand better what are the
> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint size
> is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is
> it supports incremental checkpoint, but would love to know what else we are
> losing in choosing FsStateBackend.
>

As of now, feature-wise both backends support asynchronous snapshotting,
state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends
would be your expected state size.
That being said, it could be possible in the future that savepoint formats
for the backends are changed to be compatible, meaning that you will be
able to switch between different backends upon restore [1].


>
> Thanks a lot!
> Ran Zhang
>

Cheers,
Gordon

 [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Tzu-Li (Gordon) Tai
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on
potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup
position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the
savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that
occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back
to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:

> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
> consumer after 1.3.3?
>
> On 23/01/2020 12:02, Somya Maithani wrote:
>
> Hey,
>
> Any ideas about this? We are blocked on the upgrade because we want async
> timer checkpointing.
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>
>
> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
> wrote:
>
>> Hey Team,
>>
>> *Problem*
>> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
>> and we noticed that a week old offset was consumed from Kafka even though
>> the configuration says latest.
>>
>> *Pretext*
>> 1. Our current Flink version in production is 1.2.1.
>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>> 3. We consume and produce messages to / from Kafka.
>>
>> *Release Plan*
>> 1. Upgrade Flink 1.2.1 to 1.3.
>> 2. Upgrade Flink 1.3.3 to 1.9.1
>> Note: We have a transitioning version (1.3.3) because of the
>> serialisation change in checkpointing.
>>
>> After performing step 1, the service was consuming latest Kafka events
>> but after performing step 2 we noticed that the service was consuming one
>> week old Kafka messages from the source topic. We did not see any
>> exceptions but since the number of messages consumed increased a lot for
>> our Flink infrastructure, our task managers started crashing eventually.
>>
>> We did not change Kafka configuration in the service for the upgrade but
>> we did upgrade the Flink dependencies for Kafka.
>>
>> Old dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.10
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> New dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.11
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.12
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> Do we know why this would be happening?
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>
>


Re: Questions of "State Processing API in Scala"

2020-01-20 Thread Tzu-Li (Gordon) Tai
Hi Izual,

Thanks for reporting this! I'm also forwarding this to the user mailing
list, as that is the more suitable place for this question.

I think the usability of the State Processor API in Scala is indeed
something that hasn’t been looked at closely yet.

On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:

> Hi community,
>
> When I use state in Scala, something makes confused, I followed these
> steps to generate and read states:
>
> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> and run jobA => that makes good.
>
> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> expected.
>
> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> below), and run jobB => failed, exceptions shows that "Caused by:
> org.apache.flink.util.StateMigrationException: The new key serializer must
> be compatible."
>
>
> ReaderFunction code as below:
>
> ```
>
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> Long)] {
>
> var countState: ValueState[(Long, Long)] = _
>
> override def open(parameters: Configuration): Unit = {
>
>   val stateDescriptor = new ValueStateDescriptor("average",
> createTypeInformation[(Long, Long)])
>
>   countState = getRuntimeContext().getState(stateDescriptor)
>
> }
>
> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> out: Collector[(Long, Long)]): Unit = {
>
>   out.collect(countState.value())
>
> }
>
>   }
>
> ```
>
> d. then I try to use java.lang.Long instead of Long in key-type, and run
> jobB => exception just disappeared and that makes good.
>
> This makes me confused. Did I miss some features in State-Processing-API,
> such as `magic-implicits`?
>

This part is explainable. The "magic-implicits" actually happen in the
DataStream Scala API.
Any primitive Scala types will inferred and serialized as their Java
counterparts.
AFAIK, this would not happen in the State Processor API yet and therefore
why you are getting the StateMigrationException.
When using Scala types directly with the State Processor API, I would guess
that Kryo (as a generic fallback) was being used to access state.
This can probably be confirmed by looking at the exception stack trace. Can
you post a full copy of that?

This should be resolvable by properly supporting Scala for the State
Processor API, but it's just that up to this point, we didn't have a plan
for that yet.
Can you open a JIRA for this? I think it'll be a reasonable extension to
the API.


>
> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> does not work.
>

I'm not sure what you mean here. Where is this keyBy happening? In the
Scala DataStream job, or the State Processor API?


>
> Hope.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
> 2:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state


Cheers,
Gordon


Re: State name uniqueness

2020-01-20 Thread Tzu-Li (Gordon) Tai
Hi Vasily,

State names need to be unique within operators only.

Cheers,
Gordon

On Mon, Jan 20, 2020 at 10:58 AM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all,
>
> I'm a bit confused with state name uniqueness.
> Should it be unique within operator only, or within entire job?
>
> С уважением,
> Василий Мельник
>


Re: possible backwards compatibility issue between 1.8->1.9?

2019-11-18 Thread Tzu-Li (Gordon) Tai
Hi Bekir,

Before diving deeper, just to rule out the obvious:
Have you changed anything with the element type of the input stream to the
async wait operator?

This wasn't apparent from the information so far, so I want to quickly
clear that out of the way first.

Cheers,
Gordon

On Wed, Oct 30, 2019 at 11:52 PM Bekir Oguz  wrote:

> Hi guys,
> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
> the following exception. We deploy the job with 'allow-non-restored-state'
> option and from the latest checkpoint dir of the 1.8.1 version.
>
> org.apache.flink.util.StateMigrationException: The new state typeSerializer
> for operator state must not be incompatible.
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:323)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:214)
> at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
> .initializeState(AsyncWaitOperator.java:272)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:281)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
> StreamTask.java:881)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:395)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> We see from the Web UI that the 'async wait operator' is causing this,
> which is not changed at all during this upgrade.
>
> All other jobs are migrated without problems, only this one is failing. Has
> anyone else experienced this during migration?
>
> Regards,
> Bekir Oguz
>


[ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink 1.9.0, which is the latest major release.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this new major release:
https://flink.apache.org/news/2019/08/22/release-1.9.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Fwd: [VOTE] Apache Flink 1.9.0, release candidate #3

2019-08-19 Thread Tzu-Li (Gordon) Tai
Hi!

Voting on RC3 for Apache Flink 1.9.0 has started:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-1-9-0-release-candidate-3-td31988.html

Please check this out if you want to verify your applications against this
new Flink release.

Cheers,
Gordon

-- Forwarded message -
From: Tzu-Li (Gordon) Tai 
Date: Tue, Aug 20, 2019 at 1:16 AM
Subject: [VOTE] Apache Flink 1.9.0, release candidate #3
To: dev 


Hi all,

Release candidate #3 for Apache Flink 1.9.0 is now ready for your review.

Please review and vote on release candidate #3 for version 1.9.0, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 1C1E2394D3194E1944613488F320986D35C33D6A [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag “release-1.9.0-rc3” [5].
* pull requests for the release note documentation [6] and announcement
blog post [7].

As proposed in the RC2 vote thread [8], for RC3 we are only cherry-picking
minimal specific changes on top of RC2 to be able to reasonably carry over
previous testing efforts and effectively require a shorter voting time.

The only extra commits in this RC, compared to RC2, are the following:
- c2d9aeac [FLINK-13231] [pubsub] Replace Max outstanding acknowledgement
ids limit with a FlinkConnectorRateLimiter
- d8941711 [FLINK-13699][table-api] Fix TableFactory doesn’t work with DDL
when containing TIMESTAMP/DATE/TIME types
- 04e95278 [FLINK-13752] Only references necessary variables when
bookkeeping result partitions on TM

Due to the minimal set of changes, the vote for RC3 will be *open for only
48 hours*.
Please cast your votes before *Aug. 21st (Wed.) 2019, 17:00 PM CET*.

It is adopted by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Gordon

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc3/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1236
[5]
https://gitbox.apache.org/repos/asf?p=flink.git;a=tag;h=refs/tags/release-1.9.0-rc3
[6] https://github.com/apache/flink/pull/9438
[7] https://github.com/apache/flink-web/pull/244
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Release-1-9-0-release-candidate-2-tp31542p31933.html


Fwd: [VOTE] Apache Flink Release 1.9.0, release candidate #2

2019-08-09 Thread Tzu-Li (Gordon) Tai
Hi!

Voting on RC2 for Apache Flink 1.9.0 has started:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Release-1-9-0-release-candidate-2-td31542.html

Please check this out if you want to verify your applications against this
new Flink release.

Best,
Gordon

-- Forwarded message -
From: Tzu-Li (Gordon) Tai 
Date: Fri, Aug 9, 2019 at 6:17 PM
Subject: [VOTE] Apache Flink Release 1.9.0, release candidate #2
To: dev 


Hi all,

Release candidate #2 for Apache Flink 1.9.0 is now ready for your review.
This is the first voting candidate for 1.9.0, following the preview
candidates RC0 and RC1.

Please review and vote on release candidate #2 for version 1.9.0, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 1C1E2394D3194E1944613488F320986D35C33D6A [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag “release-1.9.0-rc2” [5].

Robert is also preparing a pull request for the announcement blog post in
the works, and will update this voting thread with a link to the pull
request shortly afterwards.

The vote will be open for *at least 72 hours*.
Please cast your votes before *Aug. 14th (Wed.) 2019, 17:00 PM CET*.It is
adopted by majority approval, with at least 3 PMC affirmative votes.Thanks,
Gordon[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1234
[5]
https://gitbox.apache.org/repos/asf?p=flink.git;a=tag;h=refs/tags/release-1.9.0-rc2


Re: Restore state class not found exception in 1.8

2019-08-06 Thread Tzu-Li (Gordon) Tai
Hi Lasse,

I think the diagnosis here:
https://issues.apache.org/jira/browse/FLINK-13159 matches your problem.
This problem should be fixed in the next bugfix version for 1.8.x. We'll
also try to fix this for the upcoming 1.9.0 as well.

Cheers,
Gordon

On Mon, Jun 3, 2019 at 1:55 PM Lasse Nedergaard 
wrote:

> Hi Gordon
>
> To us it looks like the env.registerclass is needed when we write the save
> point. If we have an existing save point without the classes registered it
> doesn’t work.
>
> We have only seen the exception in our own sink that store pending data in
> operator state through CheckpointedFunction interface and this sink isn’t
> used in all our jobs.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 3. jun. 2019 kl. 12.50 skrev Tzu-Li (Gordon) Tai  >:
>
> Hi Lasse,
>
> This is indeed a bit odd. I'll need to reproduce this locally before I can
> figure out the root problem. Please bear with me for a while, will get back
> to you on this.
>
> Meanwhile, you mentioned that you only had some jobs failing with the
> posted exception. Did you figure out any more details on why this was only
> partially happening?
>
> Cheers,
> Gordon
>
> On Tue, May 28, 2019 at 8:59 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi Gordon
>>
>> We have found a solution but not why it happens on 1.8.
>> For it to work we need to call
>> Env.registertype(Reportmessage.class)
>>
>> Reportmessage extends ReportmessageBase and the state operator use
>> ReportmessageBase.
>> So we need to register all the class’s that extends a class used in
>> state. Don’t know why this is needed in 1.8
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 28. maj 2019 kl. 10.06 skrev Tzu-Li (Gordon) Tai > >:
>>
>> Hi Lasse,
>>
>> Did you move the class to a different namespace / package or changed to
>> be a nested class, across the Flink versions?
>> That would be the only cause I could reason about at the moment.
>>
>> If possible, could you also have a very minimal snippet / instructions on
>> how I can maybe reproduce this?
>> That might give me more insight.
>>
>> Cheers,
>> Gordon
>>
>> On Mon, May 27, 2019 at 7:52 PM Lasse Nedergaard <
>> lassenederga...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> When we restart some of our jobs from a savepoint we see the the
>>> exception below. It only happens for some of our jobs and we didn't see it
>>> in 1.7.2. The class Flink can't find differ from job to job and we are sure
>>> it's included in our Fat jar.
>>> As a side note we are on our way to use Avro instead of POJO, but are
>>> not there yet.
>>> If anyone have a clue what the root cause could be, and how to resolve
>>> it would be appreciated.
>>> Thanks in advance
>>>
>>> Lasse Nedergaard
>>>
>>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
>>> state backend for StreamSink_609b5f7fc746f29234b038c121356a9b_(2/2) from 
>>> any of the 1 provided restore options.
>>> at 
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>>> ... 5 more
>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
>>> when trying to restore operator state backend
>>> at 
>>> org.apache.flink.runtime.state.DefaultOperatorS

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Tzu-Li (Gordon) Tai
I quickly checked the implementation of duplicate() for both the
KryoSerializer and StreamElementSerializer (which are the only serializers
involved here).
They seem to be correct; especially for the KryoSerializer, since
FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when
duplicating it, and therefore Kryo instances should not be shared at all
across duplicates.
This seems to rule out any duplication issues with the serializers.

As a maybe relevant question, @Fabian do you register any types /
serializers via ExecutionConfig.registerKryoType(...) /
ExecutionConfig.registerTypeWithKryoSerializer(...)?

Best,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert  wrote:

> No, not yet. We lack some knowledge in understanding this. The only thing
> we found out that it happens most probably in the Elasticsearch Sink,
> because:
> - some error messages have the sink in their stack trace.
> - when bumping the ES nodes specs on AWS, the error happens less often (we
> haven't bumped it to super large instances yet, nor got to a state where
> they go away completely. also this would not be the ideal fix)
>
> so my current assumption is that some backpressuring is not happening
> correctly. but this is super vaguely, any other hints or support on this is
> highly appreciated.
>
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fab...@zalando.de
>
>
> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Any news on this? Have you found the cause of the error?
>>
>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier 
>> wrote:
>>
>>> Indeed looking at StreamElementSerializer the duplicate() method could
>>> be bugged:
>>>
>>> @Override
>>> public StreamElementSerializer duplicate() {
>>>   TypeSerializer copy = typeSerializer.duplicate();
>>>   return (copy == typeSerializer) ? this : new
>>> StreamElementSerializer(copy);
>>> }
>>>
>>> Is ti safe to return this when copy == typeSerializer ...?
>>>
>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier 
>>> wrote:
>>>
 Hi Fabian,
 we had similar errors with Flink 1.3 [1][2] and the error was caused by
 the fact that a serialised was sharing the same object with multiple
 threads.
 The error was not deterministic and was changing from time to time.
 So maybe it could be something similar (IMHO).

 [1] http://codeha.us/apache-flink-users/msg02010.html
 [2]
 http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e

 Best,
 Flavio

 On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert 
 wrote:

> additionally we have these coming with this as well all the time:
>
> com.esotericsoftware.kryo.KryoException: 
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> or
>
>
> com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29
> Serialization trace:
> _children (com.fasterxml.jackson.databind.node.ObjectNode)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at 

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Tzu-Li (Gordon) Tai
Hi Andrea,

Is there a specific reason you want to use a custom TypeInformation /
TypeSerializer for your type?
>From the description in the original post, this part wasn't clear to me.

If the only reason is because it is generally suggested to avoid generic
type serialization via Kryo, both for performance reasons as well as
evolvability in the future, then updating your type to be recognized by
Flink as one of the supported types [1] would be enough.
Otherwise, implementing your own type information and serializer is usually
only something users with very specific use cases might be required to do.
Since you are also using that type as managed state, for a safer schema
evolvability story in the future, I would recommend either Avro or Pojo as
Jingsong Lee had already mentioned.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#flinks-typeinformation-class

On Thu, Jul 4, 2019 at 5:08 PM Andrea Spina 
wrote:

> Hi JingsongLee, thank you for your answer.
> I wanted to explore it as the last chance honestly. Anyway if defining
> custom serializers and types information involves quite a big effort, I
> would reconsider my guess.
>
> Cheers,
>
> Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee <
> lzljs3620...@aliyun.com> ha scritto:
>
>> Hi Andrea:
>> Why not make your *MyClass* POJO? [1] If it is a POJO, then flink
>> will use PojoTypeInfo and PojoSerializer that have a good
>> implementation already.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types
>>
>> Best, JingsongLee
>>
>> --
>> From:Andrea Spina 
>> Send Time:2019年7月4日(星期四) 14:37
>> To:user 
>> Subject:Providing Custom Serializer for Generic Type
>>
>> Dear community,
>> in my job, I run with a custom event type *MyClass* which is a sort of
>> "generic event" that I handle all along my streaming flow both as an event
>> (DataStream[MyClass]) and as a managed state.
>>
>> I see that Flink warns me about generic serialization of
>> *MyClass*
>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>> io.radicalbit.MyClass does not contain a setter for field
>> io$radicalbit$MyClass$$schema
>>  INFO [run-main-0] (TypeExtractor.java:1857) - Class class
>> io.radicalbit.MyClass cannot be used as a POJO type because not all fields
>> are valid POJO fields, and must be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance.
>>  INFO [run-main-0] (TypeExtractor.java:1818) - class
>> io.radicalbit.MyClass does not contain a setter for field
>> io$radicalbit$MyClass$schema
>>
>> So that I wanted to provide my custom serializer for MyClass, trying
>> first to register the Java one to check if the system recognizes it so I
>> followed [1] but it seems that it is not considered.
>>
>> I read then about [2] (the case is way akin to mine) and AFAIU I need to
>> implement a custom TypeInformation and TypeSerializer for my class as
>> suggested in [3] because Flink will ignore my registered serializer as long
>> as it considers my type as *generic*.
>>
>> config.registerTypeWithKryoSerializer(classOf[MyClass], 
>> classOf[RadicalSerde])
>>
>>
>> My question finally is: Do I need to provide this custom classes? Is
>> there any practical example for creating custom information like the above
>> mentioned? I have had a quick preliminary look at it but seems that I need
>> to provide a non-trivial amount of information to TypeInformation and
>> TypeSerializer interfaces.
>>
>> Thank you for your excellent work and help.
>>
>> Cheers.
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>> [2] -
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
>> [3] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
>> --
>> Andrea Spina
>> Head of R @ Radicalbit Srl
>> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>>
>>
>>
>
> --
> *Andrea Spina*
> Head of R @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Tzu-Li (Gordon) Tai
Thanks for being the release manager @jincheng sun
 :)

On Wed, Jul 3, 2019 at 10:16 AM Dian Fu  wrote:

> Awesome! Thanks a lot for being the release manager. Great job! @Jincheng
>
> Regards,
> Dian
>
> 在 2019年7月3日,上午10:08,jincheng sun  写道:
>
> I've also tweeted about it from my twitter:
> https://twitter.com/sunjincheng121/status/1146236834344648704
> later would be tweeted it from @ApacheFlink!
>
> Best, Jincheng
>
> Hequn Cheng  于2019年7月3日周三 上午9:48写道:
>
>> Thanks for being the release manager and the great work Jincheng!
>> Also thanks to Gorden and the community making this release possible!
>>
>> Best, Hequn
>>
>> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
>> wrote:
>>
>>> Hi,
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
>>> 1.8 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Great thanks to @Tzu-Li (Gordon) Tai  's offline
>>> kind help!
>>>
>>> Regards,
>>> Jincheng
>>>
>>
>


Re: Restore state class not found exception in 1.8

2019-06-03 Thread Tzu-Li (Gordon) Tai
Hi Lasse,

This is indeed a bit odd. I'll need to reproduce this locally before I can
figure out the root problem. Please bear with me for a while, will get back
to you on this.

Meanwhile, you mentioned that you only had some jobs failing with the
posted exception. Did you figure out any more details on why this was only
partially happening?

Cheers,
Gordon

On Tue, May 28, 2019 at 8:59 PM Lasse Nedergaard 
wrote:

> Hi Gordon
>
> We have found a solution but not why it happens on 1.8.
> For it to work we need to call
> Env.registertype(Reportmessage.class)
>
> Reportmessage extends ReportmessageBase and the state operator use
> ReportmessageBase.
> So we need to register all the class’s that extends a class used in state.
> Don’t know why this is needed in 1.8
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. maj 2019 kl. 10.06 skrev Tzu-Li (Gordon) Tai  >:
>
> Hi Lasse,
>
> Did you move the class to a different namespace / package or changed to be
> a nested class, across the Flink versions?
> That would be the only cause I could reason about at the moment.
>
> If possible, could you also have a very minimal snippet / instructions on
> how I can maybe reproduce this?
> That might give me more insight.
>
> Cheers,
> Gordon
>
> On Mon, May 27, 2019 at 7:52 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>>
>> When we restart some of our jobs from a savepoint we see the the
>> exception below. It only happens for some of our jobs and we didn't see it
>> in 1.7.2. The class Flink can't find differ from job to job and we are sure
>> it's included in our Fat jar.
>> As a side note we are on our way to use Avro instead of POJO, but are not
>> there yet.
>> If anyone have a clue what the root cause could be, and how to resolve it
>> would be appreciated.
>> Thanks in advance
>>
>> Lasse Nedergaard
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
>> state backend for StreamSink_609b5f7fc746f29234b038c121356a9b_(2/2) from any 
>> of the 1 provided restore options.
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>>  ... 5 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
>> when trying to restore operator state backend
>>  at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>  ... 7 more
>> Caused by: java.lang.RuntimeException: Cannot instantiate class.
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:384)
>>  at 
>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
>>  at 
>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
>>  at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>>  ... 11 more
>> Caused by: java.lang.ClassNotFoundException: 
>> org/trackunit/tm2/formats/ReportMessage
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:348)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:382)
>>  ... 14 more
>>
>>
>>
>>
>


Re: What are savepoint state manipulation support plans

2019-05-29 Thread Tzu-Li (Gordon) Tai
FYI: Seth starting a FLIP for adding a savepoint connector that addresses
this -
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29233.html

Please join the discussion there if you are interested!

On Thu, Mar 28, 2019 at 5:23 PM Tzu-Li (Gordon) Tai 
wrote:

> @Ufuk
>
> Yes, creating a JIRA now already to track this makes sense.
>
> I've proceeded to open one:
> https://issues.apache.org/jira/browse/FLINK-12047
> Let's move any further discussions there.
>
> Cheers,
> Gordon
>
> On Thu, Mar 28, 2019 at 5:01 PM Ufuk Celebi  wrote:
>
>> I think such a tool would be really valuable to users.
>>
>> @Gordon: What do you think about creating an umbrella ticket for this
>> and linking it in this thread? That way, it's easier to follow this
>> effort. You could also link Bravo and Seth's tool in the ticket as
>> starting points.
>>
>> – Ufuk
>>
>


Re: Restore state class not found exception in 1.8

2019-05-28 Thread Tzu-Li (Gordon) Tai
Hi Lasse,

Did you move the class to a different namespace / package or changed to be
a nested class, across the Flink versions?
That would be the only cause I could reason about at the moment.

If possible, could you also have a very minimal snippet / instructions on
how I can maybe reproduce this?
That might give me more insight.

Cheers,
Gordon

On Mon, May 27, 2019 at 7:52 PM Lasse Nedergaard 
wrote:

> Hi.
>
> When we restart some of our jobs from a savepoint we see the the exception
> below. It only happens for some of our jobs and we didn't see it in 1.7.2.
> The class Flink can't find differ from job to job and we are sure it's
> included in our Fat jar.
> As a side note we are on our way to use Avro instead of POJO, but are not
> there yet.
> If anyone have a clue what the root cause could be, and how to resolve it
> would be appreciated.
> Thanks in advance
>
> Lasse Nedergaard
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for StreamSink_609b5f7fc746f29234b038c121356a9b_(2/2) from any 
> of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>   ... 5 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore operator state backend
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Cannot instantiate class.
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:384)
>   at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
>   at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>   ... 11 more
> Caused by: java.lang.ClassNotFoundException: 
> org/trackunit/tm2/formats/ReportMessage
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:382)
>   ... 14 more
>
>
>
>


Re: Queryable State race condition or serialization errors?

2019-05-27 Thread Tzu-Li (Gordon) Tai
Hi Burgess,

Would you be able to provide a minimal project that can reproduce your
error?
That would help a lot with figuring out the issue.
If you prefer to share that only privately, please feel free to send me a
private email with the details.
Another thing you can do is set logging level to "DEBUG". We have some
checks enabled at that level to see if serializers are being concurrently
used across threads.

Cheers,
Gordon

On Tue, May 21, 2019 at 9:59 PM burgesschen  wrote:

> Hi Gary.
>
> Thanks for the reply. I am using RocksDBStateBackend though.
>
> Best,
> Chen
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Connectors (specifically Kinesis Connector)

2019-05-23 Thread Tzu-Li (Gordon) Tai
Hi Steven,

I assume you are referring to the problem that we don't publish the Kinesis
connector artifacts to Maven, due to the licensing issue with KCL?
I didn't manage to find any JIRAs that were addressing this issue directly,
but the most related one would be this:
https://issues.apache.org/jira/browse/FLINK-3924.

Cheers,
Gordon

On Tue, May 21, 2019 at 10:24 PM Steven Nelson 
wrote:

> Hello!
>
> We keep having difficulties with the Kinesis connector. We have to publish
> our own version, and we understand why. What I am curious about is the plan
> to make this better in the future. Is there an issue/FLIP that I can
> reference when talking internally about this?
>
> -Steve
>


Re: State migration into multiple operators

2019-05-14 Thread Tzu-Li (Gordon) Tai
Hi,

Just to add to what Piotr already mentioned:
The community is working on adding support for this directly in Flink.
You can follow the efforts here:
https://issues.apache.org/jira/browse/FLINK-12047.

Cheers,
Gordon


On Tue, May 14, 2019 at 11:39 AM Piotr Nowojski  wrote:

> Hi,
>
> Currently there is no native Flink support for modifying the state in a
> such manner. However there is an on-going effort [1] and a third party project
> [2] to address exactly this. Both allows you you to read savepoint,
> modify it and write back the new modified savepoint from which you can
> restore.
>
> [1] https://github.com/sjwiesman/flink/commits/savepoint-connector
> [2] https://github.com/king/bravo
>
> You can take a look at them.
>
> Piotrek
>
> On 14 May 2019, at 09:55, bastien dine  wrote:
>
> Hello,
>
> I would like to have some advices about splitting an operator with a state
> into multiple operators.
> The new operators would have state containing pieces of information of the
> initial state
> We will "split" the state
>
> For exemple, I have operator (process) with uid A, with a state containing
> field1, field2
> I would like to split it into two operator B & C with, respectively, state
> field1 and state field2
>
> How can I split my state upon multiple operators ?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
>


Re: problem with avro serialization

2019-05-14 Thread Tzu-Li (Gordon) Tai
Hi,

Aljoscha opened a JIRA just recently for this issue:
https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue,
the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh 
wrote:

> Hello -
>
> Facing an issue with avro serialization with Scala case classes generated
> through avrohugger ..
> Scala case classes generated by avrohugger has the avro schema in the
> companion object. This is a sample generated class (details elided) ..
>
> case class Data(var id: Int, var name: String) extends
> org.apache.avro.specific.SpecificRecordBase {
>   def this() = this(0, "")
>   def get(field$: Int): AnyRef = {
> //..
>   }
>   def put(field$: Int, value: Any): Unit = {
> //..
>   }
>   def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
> }
> object Data {
>   val SCHEMA$ = new
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
> }
>
> Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$
> property in the class & is unable 2 use Java reflection 2 identify the
> SCHEMA$ in the companion object. The exception that I get is the
> following ..
>
> java.lang.RuntimeException: Serializing the source elements failed:
>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.avro.AvroRuntimeException: Not a Specific class: class
>> pipelines.flink.avro.Data
>
>
> Any help or workaround will be appreciated ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: Avro state migration using Scala in Flink 1.7.2 (and 1.8)

2019-05-14 Thread Tzu-Li (Gordon) Tai
Hi Marc!

I know we talked offline about the issues mentioned in this topic already,
but I'm just relaying the result of the discussions here to make it
searchable by others bumping into the same issues.

On Thu, Mar 21, 2019 at 4:27 PM Marc Rooding  wrote:

> Hi
>
> I’ve been trying to get state migration with Avro working on Flink 1.7.2
> using Scala case classes but I’m not getting anywhere closer to solving it.
>
> We’re using the most basic streaming WordCount example as a reference to
> test the schema evolution:
>
> val wordCountStream: DataStream[WordWithCount] = text
>   .flatMap { w => w.split("\\s") }
>   .map { w => WordWithCount(w, 1) }
>   .keyBy(_.word)
>   .reduce((a, b) => WordWithCount(a.word, a.count + b.count))
>
>
> In this example, WordWithCount is our data object that we’d like to have
> serialized and deserialized with schema evolution support since keyBy
> maintains state.
>
> I understood from the documentation that it would only work for classes
> generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate
> our case classes. However, for normal case classes generated by Avro we
> quickly ran into the problem that we needed a no-arg constructor.
>
> We looked at the flink-avro module and noticed that the classes generated
> by the avro-maven-plugin were implementing SpecificRecord and seemed to
> comply with the POJO rules as described in the Flink documentation. After
> switching from normal to specific avro generation with sbt-avrohugger, we
> ended up with Scala case classes that should comply with all rules.
>
> An example of such a generated case class is as follows:
>
> /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
> import scala.annotation.switch
>
> case class WordWithCount(var word: String, var count: Long) extends 
> org.apache.avro.specific.SpecificRecordBase {
>   def this() = this("", 0L)
>   def get(field$: Int): AnyRef = {
> (field$: @switch) match {
>   case 0 => {
> word
>   }.asInstanceOf[AnyRef]
>   case 1 => {
> count
>   }.asInstanceOf[AnyRef]
>   case _ => new org.apache.avro.AvroRuntimeException("Bad index")
> }
>   }
>   def put(field$: Int, value: Any): Unit = {
> (field$: @switch) match {
>   case 0 => this.word = {
> value.toString
>   }.asInstanceOf[String]
>   case 1 => this.count = {
> value
>   }.asInstanceOf[Long]
>   case _ => new org.apache.avro.AvroRuntimeException("Bad index")
> }
> ()
>   }
>   def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
> }
>
> object WordWithCount {
>   val SCHEMA$ = new 
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}")
> }
>
>
> This, however, also didn’t work out of the box. We then tried to define
> our own type information using flink-avro’s AvroTypeInfo but this fails
> because Avro looks for a SCHEMA$ property (SpecificData:285) in the class
> and is unable to use Java reflection to identify the SCHEMA$ in the Scala
> companion object.
>

This is now tracked by https://issues.apache.org/jira/browse/FLINK-12501.
This is really a problem with Avro's SpecificData#getSchema() method not
working well with a specific 3rd library implementation like avrohugger.
Either this is fixed in avrohugger, or we work-around this by explicitly
handling the case.

> implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new 
> AvroTypeInfo(classOf[WordWithCount])
>
> We then read in the 1.7 documentation that Flink doesn’t natively support
> POJO types, but only state defined by descriptors, like f.e. the
> ListStateDescriptor, and only if you allow Flink to infer the type
> information. This is definitely what we need for our processors that have
> map and list state. However, for the simple word count example, we should
> only need native POJO (de)serialization with state migration.
>
> We then noticed Github PR #7759 that adds support for POJO state schema
> evolution/migration. We wanted to give this a try and built flink from
> source from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars
> in our job and got a local 1.8 cluster and job running fine.
>
> However, if we do not specify our own type information, and perform the
> following steps:
>
>
>1. Run the job
>2. Create a savepoint and stop the job
>3. Update the WordWithCount avro schema to include a third field
>4. Update the job according to the generated case class
>5. Run the new job from the savepoint
>
>
> We are then faced with the following error:
>
> Caused by: java.lang.IllegalArgumentException: array is not of length 3
> thrown from ScalaCaseClassSerializer.scala:50
>

I think there is a misunderstanding here.
Scala case classes are not considered as POJOs by Flink. Since 1.8, Flink
does support schema evolution for POJOs, but not for Scala case classes 

Re: State Migration with RocksDB MapState

2019-04-25 Thread Tzu-Li (Gordon) Tai
Hi Cliff,

Thanks for bringing this up again.

I think it would make sense to at least move this forward be only
exclusively checking the schema for user keys in MapState, and allow value
schema evolution.
I'll comment on the JIRA about this, and also make it a blocker for 1.9.0
to make sure it will be resolved by then.

Concerning your question on GenericRecord:
The actual schema resolution is still performed using the Avro schema, so
you will still bump into the same issue.

Best,
Gordon

On Wed, Apr 24, 2019 at 7:45 PM Cliff Resnick  wrote:

> Hi Gordon,
>
> I noticed there has been no movement on this issue and I'm wondering if I
> can find some way to work around this.
> My MapState value is a case class container of Avro-generated
> SpecificRecords. If one SpecificRecord changes I am stuck.
>
> From the issue It seems like the blocker is around evolving the MapState
> key type.  That may be a nasty problem, but my key type is stable and will
> never change. What do you think the level of difficulty would be to add
> support for evolving only the value?
>
> Also, if I use GenericRecord instead of SpecificRecord will the need for
> schema evolution still be triggered? Or does it actually go down to the
> avro schema rather than just the class serialVersionUID?
>
>
>
>
>
>
> On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Cliff,
>>
>> Thanks for bringing this up!
>> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
>> track this [1].
>>
>> As explained in the JIRA ticket, the main reason this is disallowed in
>> the initial support for state schema evolution was due to how migration was
>> implemented in the RocksDB state backend.
>> Technically speaking, enabling this in the future is definitely possible.
>>
>> Cheers,
>> Gordon
>>
>> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>>
>> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>>
>>> After trying out state migration in 1.8 rc2 I ran into this hard stop
>>> below. The comment does not give an indication why rocksdb map state cannot
>>> be migrated, and I'm wondering what the status is, since we do need this
>>> functionality and would like to know if this is a long-term blocker or not.
>>> Anyone know?
>>>
>>>
>>> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>>>
>>


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Tzu-Li (Gordon) Tai
@Ufuk

Yes, creating a JIRA now already to track this makes sense.

I've proceeded to open one:
https://issues.apache.org/jira/browse/FLINK-12047
Let's move any further discussions there.

Cheers,
Gordon

On Thu, Mar 28, 2019 at 5:01 PM Ufuk Celebi  wrote:

> I think such a tool would be really valuable to users.
>
> @Gordon: What do you think about creating an umbrella ticket for this
> and linking it in this thread? That way, it's easier to follow this
> effort. You could also link Bravo and Seth's tool in the ticket as
> starting points.
>
> – Ufuk
>


Re: RocksDBStatebackend does not write checkpoints to backup path

2019-03-28 Thread Tzu-Li (Gordon) Tai
Hi,

Do you have the full error message of the failure?
A wild guess to begin with: have you made sure that there are sufficient
permissions to create the directory?

Best,
Gordon

On Tue, Mar 26, 2019 at 5:46 PM Paul Lam  wrote:

> Hi,
>
> I have a job (with Flink 1.6.4) which uses rocksdb incremental
> checkpointing, but the checkpointing always fails with
> `IllegalStateException`,
> because hen performing `RocksDBIncrementalSnapshotOperation`, rocksdb
> finds that `localBackupDirectory`, which should be created earlier
> by rocksdb checkpoint, doesn’t exist. But there is no error message about
> failures of rocksdb checkpoint.
>
> What could possibly be the cause? Thanks a lot!
>
> Best,
> Paul Lam
>
>


Re: What are savepoint state manipulation support plans

2019-03-28 Thread Tzu-Li (Gordon) Tai
Hi!

Regarding the support for savepoint reading / writing / processing directly
in core Flink, we've been thinking about that lately and might push a bit
for adding the functionality to Flink in the next release.
For example, beside Bravo, Seth (CC'ed) also had implemented something [1]
for this. We should start thinking about converging the efforts of similar
tools and supporting it in Flink soon.
There's no official JIRA / feature proposal for this yet, but if you're
interested, please keep an eye on the dev mailing list for it in the future.

Cheers,
Gordon

[1] https://github.com/sjwiesman/flink/tree/savepoint-connector

On Thu, Mar 28, 2019 at 4:26 PM Gyula Fóra  wrote:

> Hi!
>
> I dont think there is any ongoing effort in core Flink other than this
> library we created.
>
> You are probably right that it is pretty hacky at the moment. I would say
> this one way we could do it that seemed convenient to me at the time I have
> written the code.
>
> If you have ideas how to structure it better or improve it, you know
> where to find the code, feel free to open a PR :) That might actually takes
> us closer to having this properly in flink one day soon.
>
> Just to clarify the code you are showing:
> writer.writeAll() -> Runs the batch job that writes the checkpoint files
> for the changed operator states, returns the reference to the OperatorState
> metadata object
> StateMetadataUtils.createNewSavepoint() -> Replaces the metadata for the
> operator states you have just written in the previous savepoint
> StateMetadataUtils.writeSavepointMetadata() -> Writes a new metadata file
>
> So metadata writing happens as the very last step after the batch job has
> run. This is similar to how it works in streaming jobs in the sense there
> the jobmanager writes the metafile after the checkpointing is done. The
> downside of this approach is that the client might not have access to write
> the metafile here.
>
> Gyula
>
>
>


Re: State Migration with RocksDB MapState

2019-03-17 Thread Tzu-Li (Gordon) Tai
Hi Cliff,

Thanks for bringing this up!
AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
track this [1].

As explained in the JIRA ticket, the main reason this is disallowed in the
initial support for state schema evolution was due to how migration was
implemented in the RocksDB state backend.
Technically speaking, enabling this in the future is definitely possible.

Cheers,
Gordon

[1]  https://issues.apache.org/jira/browse/FLINK-11947

On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:

> After trying out state migration in 1.8 rc2 I ran into this hard stop
> below. The comment does not give an indication why rocksdb map state cannot
> be migrated, and I'm wondering what the status is, since we do need this
> functionality and would like to know if this is a long-term blocker or not.
> Anyone know?
>
>
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>


Re: Flink 1.7.1 uses Kryo version 2.24.0

2019-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Currently Flink uses Kryo as the default serializer for data types that
Flink's type serialization stack doesn't support [1].
This also includes serializers being used for managed state registered by
users.

Because of this, at the moment it's not easy to upgrade the Kryo version,
since it is known to be binary incompatible across major versions [2].
Therefore, upgrading Kryo would also mean we would be breaking backwards
compatibility for Flink's savepoints between Flink minor version releases
[3], which is something the community decided to maintain as part of
Flink's backward compatibility policy.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#flinks-typeinformation-class

[2]  https://github.com/EsotericSoftware/kryo#kryo-versioning-and-upgrading

[3]
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table



On Sat, Mar 16, 2019 at 4:55 AM anaray  wrote:

> Hi ,
> Flink 1.7 still uses kryo-2.24.0. Is there any specific reason for not
> upgrading kryo?
>
> Thanks,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Problems with restoring from savepoint

2019-03-06 Thread Tzu-Li (Gordon) Tai
Hi Pavel,

As you already discovered, this problem occurs still because in 1.7.x, the
KryoSerializer is still using the deprecated TypeSerializerConfigSnapshot
as its snapshot, which relies on the serializer being Java-serialized into
savepoints as state metadata.

In 1.8.0, all Flink's built-in serializers, including the KryoSerializer,
have been upgraded to use the new abstraction (i.e.
TypeSerializerSnapshot), which doesn't rely on Java serialization anymore.
So, essentially, you won't bump into this problem anymore after upgrading
to the upcoming 1.8.0.

Please note that this problem only fully goes away once you have a
savepoint taken with 1.8.0. When restoring from a 1.7.1 savepoint (or any
version earlier than 1.8.0), Java-deserialization of the serializer still
occurs, so you will need to keep that workaround of adding the
serialVersionUID around until you fully upgrade to 1.8.0 savepoints.

I think the first release candidate for 1.8.0 will be available soon.
Would be interesting if you can try that out and let me know how this works
out for you with the release candidate!

Cheers,
Gordon

On Wed, Mar 6, 2019 at 5:06 PM Pavel Potseluev 
wrote:

> Hi!
>
> We use flink-1.7.1 and have some problems with restoring from savepoint.
> We use custom kryo serializer which relies on protobuf representation of
> our model classes. It had been working fine but when we made some change in
> our model class it broke because of changed serialVersionUID. We can see
> this message in the log:
>
>
> Caused by: java.lang.IllegalStateException: Could not Java-deserialize
> TypeSerializer while restoring checkpoint metadata for serializer snapshot
> 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'.
> Please update to the TypeSerializerSnapshot interface that removes Java
> Serialization to avoid this problem in the future.
>
>
> I found that method *snapshotConfiguration* of
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer returns
> instance of KryoSerializerConfigSnapshot. And this class for some reason
> extends deprecated TypeSerializerConfigSnapshot which relies on java
> serialization.
>
>
> Of course we have fixed our problem just by adding special
> serialVersionUID to our class. But it seems strange to have problems with
> java serialization while our serializer doesn't use this mechanism. Do you
> plan to fix this problem?
>
> Full stack trace below:
>
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for StreamGroupedReduce_5d41c2bc0b6f18591a40bd21a3e516cd_(2/2) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>   ... 5 more
> Caused by: java.lang.IllegalStateException: Could not Java-deserialize 
> TypeSerializer while restoring checkpoint metadata for serializer snapshot 
> 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'.
>  Please update to the TypeSerializerSnapshot interface that removes Java 
> Serialization to avoid this problem in the future.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.restoreSerializer(TypeSerializerConfigSnapshot.java:138)
>   at 
> org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:212)
>   at 
> org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:188)
>   at 
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:135)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.getStateSerializer(CopyOnWriteStateTable.java:541)
>   at 
> 

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

2019-02-18 Thread Tzu-Li (Gordon) Tai
Hi,

I just saw a JIRA opened for this:
https://issues.apache.org/jira/browse/FLINK-11654.

The JIRA ticket's description matches what I had in mind and can confirm
the bug assessment. Unfortunately, I currently do not have the capacity to
provide a fix and test for this.
For the meantime, I've made this a blocker for releasing 1.8.0. It would be
great if someone can try out the proposed fix mentioned in the JIRA, see if
it fixes the issue in your cases, and provide a PR for the patch.

Thanks,
Gordon

On Tue, Feb 19, 2019 at 9:46 AM Rohan Thimmappa 
wrote:

> Hi Tzu-Li,
>
> Any updated on this. This is consistently reproducible.
>
> Same jar - Separate source topic to Separate  destination topic.
>
> This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to 
> send data to Kafka: Producer attempted an operation with an old epoch. Either 
> there is a newer producer with the same transactionalId, or the producer's 
> transaction has been expired by the broker.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
>   at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>   at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>
>
>
>
> Rohan
>
> On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> I think this is unexpected. The generated transactional ids should not be
>> clashing.
>> Looking at the FlinkKafkaProducer code, it seems like the generation is
>> only a function of the subtask id of the FlinkKafkaProducer, which could be
>> the same across 2 different Kafka sources.
>>
>> I'm not completely certain about this. Piotr (in CC) might have more
>> insights for this.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
>> chris_slotterb...@comcast.com> wrote:
>>
>>> Hey all,
>>>
>>

[ANNOUNCE] Apache Flink 1.7.2 released

2019-02-17 Thread Tzu-Li (Gordon) Tai
Hi,

The Apache Flink community is very happy to announce the release of
Apache Flink 1.7.2, which is the second bugfix release for the Apache
Flink 1.7 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html


Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2019/02/15/release-1.7.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344632

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Gordon


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Tzu-Li (Gordon) Tai
Hi,

@Averell  I renamed the
`ElasticsearchFailureHandlerIndexer` to be `BufferingNoOpRequestIndexer`,
which explains why you can't find it.

The voting thread for RC#1 of 1.7.2 can be found at [1].
The actual commits which fixes the problem are d9c45af to 2f52227.

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-7-2-release-candidate-1-td26882.html

On Thu, Feb 14, 2019 at 9:38 AM Ken Krugler 
wrote:

> Hi Averell,
>
>
> https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d
>
> …is the commit where Gordon made his changes for FLINK-11046
> .
>
> The ElasticsearchFailureHandlerIndexer class was removed as part of the
> commit.
>
> — Ken
>
>
> On Feb 13, 2019, at 4:46 PM, Averell  wrote:
>
> Hi Ken,
>
> Thanks for that. But I could not find the changes included in Gordon's
> mentioned pull request in the repository you gave me (e.g: the new class
> /ElasticsearchFailureHandlerIndexer/).
> I have found this folder
> https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
> doesn't have that new class.
> Maybe Gordon meant 1.7.2 rc2?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: 大家怎么学习flink的呢

2019-02-13 Thread Tzu-Li (Gordon) Tai
Hi,

除了 Apache Flink 官方文件以外 [1],我個人也建議可以看看 Ververica 這一系列的 Flink training 題材:
https://training.ververica.com/
除此之外,學習過程中有遇到任何問題也歡迎可以直接發信件跟我們詢問。

- Gordon

[1] https://flink.apache.org/

On Thu, Feb 14, 2019 at 11:44 AM shen lei  wrote:

> 有木有好的经验或者方法分享一下,感谢。最近学的,感觉还是不系统。


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-13 Thread Tzu-Li (Gordon) Tai
Thanks for testing it out.
Will be great to get your feedback on whether the release candidate for
1.7.2 fixes this for you.

On Wed, Feb 13, 2019 at 7:38 PM Averell  wrote:

> Thank you Gordon.
>
> That's my exact  problem. Will try the fix in 1.7.2 now.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: error while querying state

2019-02-12 Thread Tzu-Li (Gordon) Tai
Hi,

Which Flink version are you using?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

2019-02-12 Thread Tzu-Li (Gordon) Tai
Hi,

I think this is unexpected. The generated transactional ids should not be
clashing.
Looking at the FlinkKafkaProducer code, it seems like the generation is
only a function of the subtask id of the FlinkKafkaProducer, which could be
the same across 2 different Kafka sources.

I'm not completely certain about this. Piotr (in CC) might have more
insights for this.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> Hey all,
>
>
>
> I am running into an issue where if I run 2 flink jobs (same jar,
> different configuration), that produce to different kafka topics on the
> same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
> semantics, both jobs go into a checkpoint exception loop every 15 seconds
> or so:
>
>
>
> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> Producer attempted an operation with an old epoch. Either there is a newer
> producer with the same transactionalId, or the producer's transaction has
> been expired by the broker.
>
>
>
> As soon as one of the jobs is cancelled, things go back to normal for the
> other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in
> the producer to be unique for each of the jobs. My producer transaction
> timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is
> there some way to prevent these jobs from tripping over each other in
> execution while retaining exactly once processing?
>


Re: Flink 1.6 Yarn Session behavior

2019-02-12 Thread Tzu-Li (Gordon) Tai
Hi,

I'm forwarding this question to Gary (CC'ed), who most likely would have an
answer for your question here.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 8:33 AM Jins George  wrote:

> Hello community,
>
> I am trying to  upgrade a  Flink Yarn session cluster running BEAM
> pipelines  from version 1.2.0 to 1.6.3.
>
> Here is my session start command: yarn-session.sh -d *-n 4*  -jm 1024 -tm
> 3072 *-s 7*
>
> Because of the dynamic resource allocation,  no taskmanager gets created
> initially. Now once I submit a job with parallelism 5, I see that 1
> task-manager gets created and all 5 parallel instances are scheduled on the
> same taskmanager( because I have 7 slots).  This can create hot spot as
> only one physical node ( out of 4 in my case) is utilized for processing.
>
> I noticed the legacy mode, which would provision all task managers at
> cluster creation, but since legacy mode is expected to go away soon, I
> didn't want to try that route.
>
> Is there a way I can configure the multiple jobs or parallel instances of
> same job spread across all the available Yarn nodes and continue using the
> 'new' mode ?
>
> Thanks,
>
> Jins George
>


Re: How to register TypeInfoFactory for 'external' class

2019-02-12 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

I don't think using the @TypeInfo annotation is doable at the moment.

Is this class being used only for input / output types of functions /
operators?
Or are you using it as a state type?

For the former, I think you can explicitly set the TypeInformation by
calling setTypeInfo on transformations.
For the latter, you can provide your own TypeInformation / TypeSerializer
for the state when declaring the StateDescriptor for the state.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: In-Memory state serialization with kryo fails

2019-02-12 Thread Tzu-Li (Gordon) Tai
Hi,

I would suggest to avoid Kryo for state serialization, especially if this
job is meant for production usage.
It might get in the way in the future when you might decide to upgrade your
value state schema.

To do that, when declaring the descriptor for your MapState, provide a
specific serializer for your value ( *java.util.List[SomeClass[_]]*  ).
You should be able to use Flink's ListSerializer for this. By providing a
specific serializer, this bypasses Flink's type extraction for your state
which determines to use the KryoSerializer as a fallback for unrecognizable
types.
You can find more information about custom state serialization here [1].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html

On Wed, Feb 13, 2019 at 2:56 AM Rinat  wrote:

> Hi mates !
>
> I’ve implemented a job, that stores it’s progress using *MapState[K, V]*,
> where *K* - is *java.lang.String*, and *V* - is a collection of some
> typed objects *java.util.List[SomeClass[_]]*
> When Flink is trying to serialize this state, it is using kryo serializer
> for value object and fails with *StackOverflowException*
>
> *java.lang.StackOverflowError*
> * at java.util.HashMap.hash(HashMap.java:338)*
> * at java.util.HashMap.get(HashMap.java:556)*
> * at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)*
>
> This problem is related with the known bug in *kryo* (
> https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself
> only when type of* SomeClass* is a *java.util.BitSet.*
>
> I’ve checked my job locally (from IDE) with latest (4.0.2
> )
> kryo lib, and it works fine, but I couldn’t change kryo version for
> distributed mode, because it’s packaged into fat-jar
> (flink-dist_2.11-1.6.1.jar), that
> contains all runtime dependencies for Flink.
>
> Maybe you can give me any advices, how to solve this issue, or register a
> separate serializers for this case ?
>
> Thx for your help.
>
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>


Re: Help with a stream processing use case

2019-02-10 Thread Tzu-Li (Gordon) Tai
Hi,

If Firehouse already supports sinking records from a Kinesis stream to an
S3 bucket, then yes, Chesnay's suggestion would work.
You route each record to a specific Kinesis stream depending on some value
in the record using the  KinesisSerializationSchema, and sink each Kinesis
stream to their target S3 bucket.

Another obvious approach is to use side output tags in the Flink job to
route records to different streaming file sinks that write to their own S3
buckets, but that would require knowing the target S3 buckets upfront.

Cheers,
Gordon

On Sun, Feb 10, 2019 at 5:42 PM Chesnay Schepler  wrote:

> I'll need someone else to chime in here for a definitive answer (cc'd
> Gordon), so I'm really just guessing here.
>
> For the partitioning it looks like you can use a custom partitioner, see
> FlinkKinesisProducer#setCustomPartitioner.
> Have you looked at the KinesisSerializationSchema described in the
> documentation
> ?
> It allows you to write to a specific stream based on incoming events, but
> I'm not sure whether this translates to S3 buckets and keyspaces.
>
> On 08.02.2019 16:43, Sandybayev, Turar (CAI - Atlanta) wrote:
>
> Hi all,
>
>
>
> I wonder whether it’s possible to use Flink for the following requirement.
> We need to process a Kinesis stream and based on values in each record,
> route those records to different S3 buckets and keyspaces, with support for
> batching up of files and control over partitioning scheme (so preferably
> through Firehose).
>
>
>
> I know it’s straightforward to have a Kinesis source and a Kinesis sink,
> and the hook up Firehose to the sink from AWS, but I need a “fan out” to
> potentially thousands of different buckets, based on content of each event.
>
>
>
> Thanks!
>
> Turar
>
>
>
>
>
>
>


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-09 Thread Tzu-Li (Gordon) Tai
Hi Averell,

This seems to be the bug that you encountered:
https://issues.apache.org/jira/browse/FLINK-11046.

Cheers,
Gordon

On Sat, Feb 9, 2019 at 3:27 PM Averell  wrote:

> Hello,
>
> I am trying to follow this Flink guide [1] to handle errors in
> ElasticSearchSink by re-adding the failed messages to the queue.
> The error scenarios that I got and going to retry are: (i) conflict in
> UpdateRequest document version and (ii) lost connection to ElasticSearch.
> These errors are expected to be non-persistent, would be solved by (i)
> changing the version / (ii) gone after some seconds
> What I expect is message got retried successfully.
> What I actually got was: Flink seemed to get stuck on that (first) retry,
> my
> flow queued up (backpressure is 1 everywhere), all processing hung.
>
> Here is my error handling code:
>
> 
> private object MyElasticSearchFailureHandler extends
> ActionRequestFailureHandler {
> override def onFailure(actionRequest: ActionRequest,
> failure: Throwable,
> restStatusCode: Int, indexer: RequestIndexer): Unit = {
> if
> (ExceptionUtils.findThrowableWithMessage(failure,
> "version_conflict_engine_exception") != Optional.empty()) {
> actionRequest match {
> case s: UpdateRequest =>
> LOG.warn(s"Failed
> inserting record to ElasticSearch due to version
> conflict (${s.version()}). Retrying")
>
> LOG.warn(actionRequest.toString)
>
> indexer.add(s.version(s.version() + 1))
> case _ =>
> LOG.error("Failed
> inserting record to ElasticSearch due to version
> conflict. However, this is not an Update-Request. Don't know why.")
>
> LOG.error(actionRequest.toString)
> throw failure
> }
> } else if (restStatusCode == -1 &&
> failure.getMessage.contains("Connection closed")) {
> LOG.warn(s"Retrying record:
> ${actionRequest.toString}")
> actionRequest match {
> case s: UpdateRequest =>
> indexer.add(s)
> case s: IndexRequest =>
> indexer.add(s)
> }
> } else {
> LOG.error(s"ELASTICSEARCH FAILED:\n
> statusCode $restStatusCode\n
> message: ${failure.getMessage}\n${failure.getStackTrace}")
> LOG.error(s"DATA:\n
> ${actionRequest.toString}")
> throw failure
> }
> }
> }
> 
>
> Here is the extract from my task-manager logs:
>
> /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
> o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
> Elasticsearch bulk request: Connection closed
> 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN
> c.n.c..sink.MyElasticSearchSink$  - Retrying record: update
> {[idx-20190208][_doc][doc_id_154962270], doc_as_upsert[true], doc[index
> {*[null][null][null]*, source[{...}]}], scripted_upsert[false],
> detect_noop[true]}
> 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO
> o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
> checkpointing for checkpoint with id=24 (max part counter=26)./
>
> And job-manager logs:
> /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
> 307078 ms).
> 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./
>
> Thanks and best regards,
> Averell
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
> <
> 

Re: Checkpoints failed with NullPointerException

2019-01-29 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks for reporting this.

This looks like a bug that we fixed in Flink 1.7.1 [1].

Would you be able to try with 1.7.1 and see if the issue is still happening
for you?

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-11094

On Tue, Jan 29, 2019, 6:29 PM Averell  I tried to create a savepoint on HDFS, and got the same exception:
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 028e392d02bd229ed08f50a2da5227e2 failed.
> at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> failed: Could not perform checkpoint 35 for operator Merge sourceA
> (7/16).
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970)
> at
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452)
> at
>
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258)
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113)
> at
>
> org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945)
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576)
> at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
> at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   

Re: getting duplicate messages from duplicate jobs

2019-01-28 Thread Tzu-Li (Gordon) Tai
Hi,

Yes, Dawid is correct.

The "group.id" setting in Flink's Kafka Consumer is only used for group
offset fetching and committing offsets back to Kafka (only for exposure
purposes, not used for processing guarantees).
The Flink Kafka Consumer uses static partition assignment on the
KafkaConsumer API, and not consumer group-based automatic partition
assignments.

Cheers,
Gordon

On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz 
wrote:

> Forgot to cc Gordon :)
>
> On 23/01/2019 18:02, Avi Levi wrote:
> > Hi,
> > This quite confusing.
> > I submitted the same stateless job twice (actually I upload it once).
> > However when I place a message on kafka, it seems that both jobs
> > consumes it, and publish the same result (we publish the result to
> > other kafka topic, so I actually see the massage duplicated on kafka
> > ). how can it be ? both jobs are using the same group id (group id is
> > fixed and not generated )
> >
> > Kind regards
> > Avi
>
>


Re: Sampling rate higher than 1Khz

2019-01-28 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, Flink's watermark timestamps are in milliseconds, which means that
time-based operators such as time window operators will be fired at a
per-millisecond granularity.
Whether or not this introduces "latency" in the pipeline depends on the
granularity of your time window operations; if you need to have window
durations shorter than 1 millisecond, then yes, having only millisecond
watermarks will introduce latency.
Currently in Flink, time-based operations such as windows / registering
timers are all done at millisecond accuracy.

Cheers,
Gordon

On Mon, Jan 28, 2019 at 7:55 PM Nicholas Walton  wrote:

> Flinks watermarks are in milliseconds. I have time sampled off a sensor at
> a rate exceeding 1Khz or 1 per millisecond. Is there a way to handle
> timestamp granularity below milliseconds, or will I have to generate
> timestamp for the millisecond value preceding that associated with the
> sensor reading, which IUC will introduce latency into the processing
> pipeline.
>
> TIA


Re: How Flink prioritise read from kafka topics and partitions ?

2019-01-28 Thread Tzu-Li (Gordon) Tai
Hi Sohi!

On Wed, Jan 23, 2019 at 9:01 PM sohimankotia  wrote:

> Hi,
>
> Let's say I have flink Kafka consumer read from 3 topics ,  [ T-1 ,T-2,T-3
> ]
> .
>
> - T1 and T2 are having partitions equal to 100
> - T3 is having partitions equal to 60
> - Flink Task (parallelism is 50)
>

There isn't any means of prioritizing some partitions / topics to be read
first before others in the Flink Kafka Consumer.


>
> How flink will prioritize Kafka topic ?
>
> If T-3 has more lag than other topics will flink give higher priority to
> T-3
>

No. However, there is some relevant discussion that would be helpful for
these situations. Please see [1].
Long story short, if sources are aligned by event-time, the partitions that
are lagging behind (in event-time) will slow down other source subtasks
until the lagging partition has caught up.
This is a feature that is still in discussion.

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952.html


> ?
>
>
> Thanks and Regards
> Sohi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-28 Thread Tzu-Li (Gordon) Tai
Thanks Peter! Yes, it would also be great if you try the patch in
https://github.com/apache/flink/pull/7580 out and see if that works for you.

On Mon, Jan 28, 2019 at 7:47 PM pwestermann 
wrote:

> Hi Gordon,
>
> We should be able to wait for 1.7.2 but I will also test the workaround and
> post if I run into further issues.
>
> Thanks a lot!
> Peter
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-28 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for all the information and reporting this.
We've identified this to be an actual issue:
https://issues.apache.org/jira/browse/FLINK-11436.

There's also a PR opened to fix this, and is currently under review:
https://github.com/apache/flink/pull/7580.
I'll make sure that this is fixed for the next bugfix release for 1.7.x
(i.e. 1.7.2).

For the time being if waiting for 1.7.2 isn't an option, to workaround this
for 1.7.1, you would have to have a copy of the AvroSerializer class, under
the same package namespace in your user code, but with serialVersionUID
changed to 1.
Also, you'll need to exclude Flink's original AvroSerializer class from
flink-avro.
You would be able to remove that workaround and extra AvroSerializer class
once you upgrade to 1.7.2.

Thanks,
Gordon

On Thu, Jan 24, 2019 at 8:37 PM pwestermann 
wrote:

> I ran `mvn dependency:tree` and only see 1.7.1 dependencies for Flink:
>
> [INFO] com.inin.analytics:analytics-flink:jar:0.0.1-SNAPSHOT
> [INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.7.1:provided
> [INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.7.1:provided
> [INFO] |  |  +-
> org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.7.1:provided
> [INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.7.1:provided
> [INFO] |  |  +- commons-io:commons-io:jar:2.4:compile
> [INFO] |  |  +-
> org.apache.flink:flink-shaded-netty:jar:4.1.24.Final-5.0:provided
> [INFO] |  |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-5.0:provided
> [INFO] |  |  +-
> org.apache.flink:flink-shaded-jackson:jar:2.7.9-5.0:provided
> [INFO] |  |  +- org.javassist:javassist:jar:3.19.0-GA:provided
> [INFO] |  |  +- org.scala-lang:scala-library:jar:2.11.12:compile
> [INFO] |  |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:provided
> [INFO] |  |  |  +- com.typesafe:config:jar:1.3.0:provided
> [INFO] |  |  |  \-
> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:provided
> [INFO] |  |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:provided
> [INFO] |  |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:provided
> [INFO] |  |  | \-
> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:provided
> [INFO] |  |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:provided
> [INFO] |  |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:provided
> [INFO] |  |  +- org.clapper:grizzled-slf4j_2.11:jar:1.3.2:provided
> [INFO] |  |  +- com.github.scopt:scopt_2.11:jar:3.5.0:provided
> [INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
> [INFO] |  |  \- com.twitter:chill_2.11:jar:0.7.6:provided
> [INFO] |  | \- com.twitter:chill-java:jar:0.7.6:provided
> [INFO] |  +- org.apache.flink:flink-shaded-guava:jar:18.0-5.0:provided
> [INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
> [INFO] |  \- org.apache.flink:force-shading:jar:1.7.1:compile
> [INFO] +- org.apache.flink:flink-clients_2.11:jar:1.7.1:provided
> [INFO] |  +- org.apache.flink:flink-core:jar:1.7.1:provided
> [INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.7.1:provided
> [INFO] |  |  +- org.apache.flink:flink-metrics-core:jar:1.7.1:provided
> [INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:provided
> [INFO] |  |  |  +- com.esotericsoftware.minlog:minlog:jar:1.2:provided
> [INFO] |  |  |  \- org.objenesis:objenesis:jar:2.1:provided
> [INFO] |  |  +- commons-collections:commons-collections:jar:3.2.2:provided
> [INFO] |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
> [INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.7.1:provided
> [INFO] |  +- org.apache.flink:flink-java:jar:1.7.1:provided
> [INFO] |  \- commons-cli:commons-cli:jar:1.3.1:provided
> [INFO] +- org.apache.flink:flink-avro:jar:1.7.1:compile
> [INFO] |  \- org.apache.avro:avro:jar:1.8.2:compile
> [INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
> [INFO] | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
> [INFO] | +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
> [INFO] | \- org.tukaani:xz:jar:1.5:compile
> [INFO] +-
> org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.7.1:provided
> [INFO] |  \- org.rocksdb:rocksdbjni:jar:5.7.5:provided
> [INFO] +-
> org.apache.flink:flink-connector-kafka-0.11_2.11:jar:1.7.1:compile
> [INFO] |  +-
> org.apache.flink:flink-connector-kafka-0.10_2.11:jar:1.7.1:compile
> [INFO] |  |  \-
> org.apache.flink:flink-connector-kafka-0.9_2.11:jar:1.7.1:compile
> [INFO] |  | \-
> org.apache.flink:flink-connector-kafka-base_2.11:jar:1.7.1:compile
> [INFO] |  \- org.apache.kafka:kafka-clients:jar:0.11.0.2:compile
> [INFO] | \- net.jpountz.lz4:lz4:jar:1.3.0:compile
> [INFO] +- org.apache.flink:flink-s3-fs-presto:jar:1.7.1:provided
>
> I also tried this again with debug logging enabled but didn't see any more
> messages that would explain the failure.
> To me, the error message
> (org.apache.flink.formats.avro.typeutils.AvroSerializer; local class
> incompatible: stream classdesc 

Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-24 Thread Tzu-Li (Gordon) Tai
Hi!

We've double checked the code, and the only plausible cause of this is that
you may be using flink-avro 1.6.x with Flink 1.7.x.
Could you double check that all Flink dependencies, including flink-avro,
are 1.7.1?
You can verify this by doing `mvn dependency:tree` on your job, and check
that flink-avro 1.6.x isn't in there.

A more detailed explanation of why we suspect this:
In Flink 1.7.x, the job will only fail if a previous Java-serialized
serializer, that couldn't be deserialized in the restore, was attempted to
be used.
In flink-avro 1.7.x, we've made sure that the previous serialized
AvroSerializer instance (which is expected to no longer be deserializable
in 1.7.1) is never accessed. This isn't the case for flink-avro 1.6.x,
which still attempts to access the serializer AvroSerializer instance.

Please update us on your verifications here. And thanks for the effort!

Cheers,
Gordon

On Wed, Jan 23, 2019 at 8:41 PM pwestermann 
wrote:

> Thanks Gordon,
>
> I get the same exception in the JM logs and that looks like it's causing
> the
> job failure.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread Tzu-Li (Gordon) Tai
Thanks for the logs.

Is the job restore actually failing? If yes, there should be an exception
for the exact cause of the failure.

Otherwise, the AvroSerializer warnings in the taskmanager logs is actually
expected behaviour when restoring from savepoint versions before 1.7.x, and
shouldn't cause job failures (unless something unexpected is happening).
Shortly put, to describe the cause of that warning:
Previously in 1.6.x, the AvroSerializer was Java-serialized into savepoints.
In 1.7.x, when restoring from previous version savepoints, that serializer
will still be attempted to be read using Java serialization (which explains
the InvalidClassException in the WARN log).
However, starting from 1.7 we no longer rely on serializers being written
directly into savepoints, so whether or not reading that serializer was
successful should not matter and the restore should proceed normally.

Please do let me know if the job is actually failing, then we should
investigate further. If it is failing, there should be an exception in the
JM logs identifying the cause of job failure.
CC'ing Igal, as he worked on the AvroSerializer for 1.7.x and might have
more info.

Cheers,
Gordon

On Wed, Jan 23, 2019 at 7:42 PM pwestermann 
wrote:

> There is not much in the log as this immediately happens when I start the
> job. I attached one of the taskmanager logs. The first error message I see
> is  /Could not read a requested serializer. Replaced with a
> UnloadableDummyTypeSerializer./ and the exception is
>
>
> taskmanager.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1547/taskmanager.log>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for reporting this.

Could you provide more details (error message, exception stack trace) that
you are getting?
This is unexpected, as the changes to flink-avro serializers in 1.7.x
should be backwards compatible.
More details on how the restore failed will be helpful here.

Cheers,
Gordon


On Wed, Jan 23, 2019 at 2:54 PM pwestermann 
wrote:

> I am trying to migrate from Flink 1.6.3 to 1.7.1 but am not able to restore
> the job from a savepoint taken in 1.6.3.
>
> We are using an AsyncFunction to publish Avro records to SQS. The state for
> the AsyncWaitOperator cannot be restored because of serializer changes in
> flink-avro from 1.6.3 to 1.7.1.
>
> Any idea how to avoid this problem? Maybe start the job with flink-avro
> 1.6.3 or will that break other parts?
>
> Thanks,
> Peter
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Custom Serializer for Avro GenericRecord

2019-01-10 Thread Tzu-Li (Gordon) Tai
Hi,

Have you looked at [1]?
You can annotate your type and provide a type info factory. The factory
would be used to create the TypeInformation for that type, and in turn
create the serializer used for that type.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#defining-type-information-using-a-factory



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Apache Flink 1.6.3 released

2018-12-23 Thread Tzu-Li (Gordon) Tai
Hi,

The Apache Flink community is very happy to announce the release of 
Apache Flink 1.6.3, which is the third bugfix release for the Apache 
Flink 1.6 series. 

Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the 
improvements for this bugfix release: 
https://flink.apache.org/news/2018/12/22/release-1.6.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344314

We would like to thank all contributors of the Apache Flink community 
who made this release possible! 

Regards, 
Gordon 



Re: Clarification around versioning and flink's state serialization

2018-12-21 Thread Tzu-Li (Gordon) Tai
1. Correct. Under the hood, evolvability of schema relies on the type's
serializer implementation to support it.
In Flink 1.7, this had been done only for Avro's Flink built-in serializer
(i.e. the AvroSerializer class) for now, so you don't need to provide a
custom serializer for this.
For any other types, that would be required for now; again, how to
implement a custom serializer that works for schema evolution is covered in
the documents.

2. Yes, disabling generic types will let the job fail if any data type is
determined to be serialized by Kryo, let it be for on-wire data
transmission or for state serialization.

I'm currently still traveling because of the recent Flink Forward event;
will send you a copy of the latest slides I presented about the topic once
I get back.

Cheers,
Gordon

On Fri, Dec 21, 2018, 10:42 PM Padarn Wilson  Yes that helps a lot!
>
> Just to clarify:
> - If using Avro types in 1.7, no explicit declaration of serializers needs
> to be done to have state evolution. But all other evolvable types (e.g
> Protobuf) still need to be registered and evolved manually?
> - If specifying `disableGenericTypes` on my execution context, anything
> that falls back to Kryo will cause an error.
>
> Would love to see more updated slides if you don't mind.
>
> Thanks for taking the time,
> Padarn
>
>
> On Fri, Dec 21, 2018 at 10:04 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> For the documents I would recommend reading through:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>>
>>
>> On Fri, Dec 21, 2018, 9:55 PM Tzu-Li (Gordon) Tai > wrote:
>>
>>> Hi,
>>>
>>> Yes, if Flink does not recognize your registered state type, it will by
>>> default use Kryo for the serialization.
>>> And generally speaking, Kryo does not have good support for evolvable
>>> schemas compared to other serialization frameworks such as Avro or Protobuf.
>>>
>>> The reason why Flink defaults to Kryo for unrecognizable types has some
>>> historical reasons due to the original use of Flink's type serialization
>>> stack being used on the batch side, but IMO the short answer is that it
>>> would make sense to have a different default serializer (perhaps Avro) for
>>> snapshotting state in streaming programs.
>>> However, I believe this would be better suited as a separate discussion
>>> thread.
>>>
>>> The good news is that with Flink 1.7, state schema evolution is fully
>>> supported out of the box for Avro types, such as GenericRecord or code
>>> generated SpecificRecords.
>>> If you want to have evolvable schema for your state types, then it is
>>> recommended to use Avro as state types.
>>> Support for evolving schema of other data types such as POJOs and Scala
>>> case classes is also on the radar for future releases.
>>>
>>> Does this help answer your question?
>>>
>>> By the way, the slides your are looking at I would consider quite
>>> outdated for the topic, since Flink 1.7 was released with much smoother
>>> support for state schema evolution.
>>> An updated version of the slides is not yet publicly available, but if
>>> you want I can send you one privately.
>>> Otherwise, the Flink docs for 1.7 would also be equally helpful.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On Fri, Dec 21, 2018, 8:11 PM Padarn Wilson >> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am trying to understand the situation with state serialization in
>>>> flink. I'm looking at a number of sources, but slide 35 from here
>>>> crystalizes my confusion:
>>>>
>>>>
>>>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>>>
>>>> So, I understand that if 'Flink's own serialization stack' is unable to
>>>> serialize a type you define, then it will fall back on Kryo generics. In
>>>> this case, I believe what I'm being told is that state compatibility is
>>>> difficult to ensure, and schema evolution in your jobs is not possible.
>>>>
>>>> However on this slide, they say
>>>> "
>>>>Kryo is generally not  recommended ...
>>>>
>>>>Serialization frameworks with schema evolution support is
>>>> recommended: Avro, Thrift
>>>> "
>>>> So is this implying that Flink's non-default serialization stack does
>>>> not suppor

Re: Clarification around versioning and flink's state serialization

2018-12-21 Thread Tzu-Li (Gordon) Tai
For the documents I would recommend reading through:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html


On Fri, Dec 21, 2018, 9:55 PM Tzu-Li (Gordon) Tai  Hi,
>
> Yes, if Flink does not recognize your registered state type, it will by
> default use Kryo for the serialization.
> And generally speaking, Kryo does not have good support for evolvable
> schemas compared to other serialization frameworks such as Avro or Protobuf.
>
> The reason why Flink defaults to Kryo for unrecognizable types has some
> historical reasons due to the original use of Flink's type serialization
> stack being used on the batch side, but IMO the short answer is that it
> would make sense to have a different default serializer (perhaps Avro) for
> snapshotting state in streaming programs.
> However, I believe this would be better suited as a separate discussion
> thread.
>
> The good news is that with Flink 1.7, state schema evolution is fully
> supported out of the box for Avro types, such as GenericRecord or code
> generated SpecificRecords.
> If you want to have evolvable schema for your state types, then it is
> recommended to use Avro as state types.
> Support for evolving schema of other data types such as POJOs and Scala
> case classes is also on the radar for future releases.
>
> Does this help answer your question?
>
> By the way, the slides your are looking at I would consider quite outdated
> for the topic, since Flink 1.7 was released with much smoother support for
> state schema evolution.
> An updated version of the slides is not yet publicly available, but if you
> want I can send you one privately.
> Otherwise, the Flink docs for 1.7 would also be equally helpful.
>
> Cheers,
> Gordon
>
>
> On Fri, Dec 21, 2018, 8:11 PM Padarn Wilson 
>> Hi all,
>>
>> I am trying to understand the situation with state serialization in
>> flink. I'm looking at a number of sources, but slide 35 from here
>> crystalizes my confusion:
>>
>>
>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>
>> So, I understand that if 'Flink's own serialization stack' is unable to
>> serialize a type you define, then it will fall back on Kryo generics. In
>> this case, I believe what I'm being told is that state compatibility is
>> difficult to ensure, and schema evolution in your jobs is not possible.
>>
>> However on this slide, they say
>> "
>>Kryo is generally not  recommended ...
>>
>>Serialization frameworks with schema evolution support is
>> recommended: Avro, Thrift
>> "
>> So is this implying that Flink's non-default serialization stack does not
>> support schema evolution? In this case is it best practice to register
>> custom serializers whenever possible.
>>
>> Thanks
>>
>>
>> *Grab is hiring. Learn more at **https://grab.careers
>> <https://grab.careers/>*
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email and notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property
>> rights in this email and attachments therein shall remain vested in Grab
>> Group, unless otherwise provided by law.
>>
>


Re: Clarification around versioning and flink's state serialization

2018-12-21 Thread Tzu-Li (Gordon) Tai
Hi,

Yes, if Flink does not recognize your registered state type, it will by
default use Kryo for the serialization.
And generally speaking, Kryo does not have good support for evolvable
schemas compared to other serialization frameworks such as Avro or Protobuf.

The reason why Flink defaults to Kryo for unrecognizable types has some
historical reasons due to the original use of Flink's type serialization
stack being used on the batch side, but IMO the short answer is that it
would make sense to have a different default serializer (perhaps Avro) for
snapshotting state in streaming programs.
However, I believe this would be better suited as a separate discussion
thread.

The good news is that with Flink 1.7, state schema evolution is fully
supported out of the box for Avro types, such as GenericRecord or code
generated SpecificRecords.
If you want to have evolvable schema for your state types, then it is
recommended to use Avro as state types.
Support for evolving schema of other data types such as POJOs and Scala
case classes is also on the radar for future releases.

Does this help answer your question?

By the way, the slides your are looking at I would consider quite outdated
for the topic, since Flink 1.7 was released with much smoother support for
state schema evolution.
An updated version of the slides is not yet publicly available, but if you
want I can send you one privately.
Otherwise, the Flink docs for 1.7 would also be equally helpful.

Cheers,
Gordon


On Fri, Dec 21, 2018, 8:11 PM Padarn Wilson  Hi all,
>
> I am trying to understand the situation with state serialization in flink.
> I'm looking at a number of sources, but slide 35 from here crystalizes my
> confusion:
>
>
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>
> So, I understand that if 'Flink's own serialization stack' is unable to
> serialize a type you define, then it will fall back on Kryo generics. In
> this case, I believe what I'm being told is that state compatibility is
> difficult to ensure, and schema evolution in your jobs is not possible.
>
> However on this slide, they say
> "
>Kryo is generally not  recommended ...
>
>Serialization frameworks with schema evolution support is
> recommended: Avro, Thrift
> "
> So is this implying that Flink's non-default serialization stack does not
> support schema evolution? In this case is it best practice to register
> custom serializers whenever possible.
>
> Thanks
>
>
> *Grab is hiring. Learn more at **https://grab.careers
> *
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


Re: Connection leak with flink elastic Sink

2018-12-14 Thread Tzu-Li (Gordon) Tai
Hi,

(Removed dev@ from the mail thread)

I took a look at the logs you provided, and it seems like the sink operators 
should have been properly tear-down, and therefore closing the 
RestHighLevelClient used internally.

I’m at this point not really sure what else could have caused this besides a 
bug with the Elasticsearch client itself not cleaning up properly.
Have you tried turning on debug level for logging to see if there is anything 
suspicious?

Cheers,
Gordon


On 13 December 2018 at 7:35:33 PM, Vijay Bhaskar (bhaskar.eba...@gmail.com) 
wrote:

Hi Gordon,
We are using flink cluster 1.6.1, elastic search connector version: 
flink-connector-elasticsearch6_2.11
Attached the stack trace. 

Following are the max open file descriptor limit of theTask manager  process 
and open connections to the elastic
search cluster

Regards
Bhaskar
#lsof -p 62041 | wc -l
65583
All the connections to elastic cluster reached to:
netstat -aln | grep 9200 | wc -l
2333



On Thu, Dec 13, 2018 at 4:12 PM Tzu-Li (Gordon) Tai  wrote:
Hi,

Besides the information that Chesnay requested, could you also provide a stack 
trace of the exception that caused the job to terminate in the first place?

The Elasticsearch sink does indeed close the internally used Elasticsearch 
client, which should in turn properly release all resources [1].
I would like to double check whether or not the case here is that that part of 
the code was never reached.

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L334

On 13 December 2018 at 5:59:34 PM, Chesnay Schepler (ches...@apache.org) wrote:

Specifically which connector are you using, and which Flink version?

On 12.12.2018 13:31, Vijay Bhaskar wrote:
> Hi
> We are using flink elastic sink which streams at the rate of 1000
> events/sec, as described in
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html.
> We are observing connection leak of elastic connections. After few
> minutes all the open connections are exceeding the process limits of
> the max open descriptors and Job is getting terminated. But the http
> connections with the elastic search server remain open forever. Am i
> missing any specific configuration setting to close the open
> connection, after serving the request?
> But there is no such setting is described in the above documentation
> of elastic sink
>
> Regards
> Bhaskar




Re: Encountered the following Expired Iterator exception in getRecords using FlinkKinesisConsumer

2018-12-14 Thread Tzu-Li (Gordon) Tai
Hi,

I’m suspecting that this is the issue: 
https://issues.apache.org/jira/browse/FLINK-11164.

One more thing to clarify to be sure of this:
Do you have multiple shards in the Kinesis stream, and if yes, are some of them 
actually empty?
Meaning that, even though you mentioned some records were written to the 
Kinesis stream, some shards actually weren’t written any records.

Cheers,
Gordon


On 14 December 2018 at 4:10:30 AM, Vijay Balakrishnan (bvija...@gmail.com) 
wrote:

Hi Gordon,

My use-case was slightly different.

1.  Started a Kinesis connector source, with TRIM_HORIZON as the startup 
position.
2. Only a few Records were written to the Kinesis stream
3. The FlinkKinesisConsumer reads the records from Kinesis stream. Then after a 
period of time of not reading anymore Kinesis Stream records, it received the 
“Encountered an unexpected expired iterator” warning in the logs, and the job 
failed with the misleading AmazonKinesisException?

Also, in 1 with LATEST  as the startup position, I have not been able to read 
any records from the Kinesis Stream.Still trying to pinpoint what i am doing 
wrong. For sure, I am not using checkpoints and not sure if this causes any 
issues with LATEST option.
TIA,
Vijay

On Thu, Dec 13, 2018 at 2:59 AM Tzu-Li (Gordon) Tai  wrote:
Hi!

Thanks for reporting this.

This looks like an overlooked corner case that the Kinesis connector doesn’t 
handle properly.

First, let me clarify the case and how it can be reproduced. Please let me know 
if the following is correct:
1. You started a Kinesis connector source, with TRIM_HORIZON as the startup 
position.
2. No records were written to the Kinesis stream at all.
3. After a period of time, you received the “Encountered an unexpected expired 
iterator” warning in the logs, and the job failed with the misleading 
AmazonKinesisException?

Cheers,
Gordon

On 13 December 2018 at 6:53:11 AM, Vijay Balakrishnan (bvija...@gmail.com) 
wrote:

Hi,
Using FlinkKinesisConsumer in a long running Flink Streaming app consuming from 
a Kinesis Stream. 
Encountered the following Expired Iterator exception in getRecords():
 org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
Encountered an unexpected expired iterator 
 
The error on the console ends up being a misleading one: "Caused by: 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
 1 validation error detected: Value 'EARLIEST_SEQUENCE_NUM' at 
'startingSequenceNumber' failed to satisfy constraint: Member must satisfy 
regular expression pattern: 0|([1-9]\d{0,128}) (Service: AmazonKinesis; Status 
Code: 400; Error Code: ValidationException; Request ID: ..)
"
 
How do I increase the ClientConfiguration.clientExecutiontimeout to avoid this 
issue or is this the right way to handle this issue ? I don't want the 
FlinkKinesisConsumer streaming app to fail just because there might be no 
records in the Kinesis Stream. I am using TRIM_HORIZON to read from the start 
of the Kinesis Stream.
 
 TIA,

Re: Encountered the following Expired Iterator exception in getRecords using FlinkKinesisConsumer

2018-12-13 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks for reporting this.

This looks like an overlooked corner case that the Kinesis connector doesn’t 
handle properly.

First, let me clarify the case and how it can be reproduced. Please let me know 
if the following is correct:
1. You started a Kinesis connector source, with TRIM_HORIZON as the startup 
position.
2. No records were written to the Kinesis stream at all.
3. After a period of time, you received the “Encountered an unexpected expired 
iterator” warning in the logs, and the job failed with the misleading 
AmazonKinesisException?

Cheers,
Gordon
On 13 December 2018 at 6:53:11 AM, Vijay Balakrishnan (bvija...@gmail.com) 
wrote:

Hi,
Using FlinkKinesisConsumer in a long running Flink Streaming app consuming from 
a Kinesis Stream. 
Encountered the following Expired Iterator exception in getRecords():
 org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] - 
Encountered an unexpected expired iterator 
 
The error on the console ends up being a misleading one: "Caused by: 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:
 1 validation error detected: Value 'EARLIEST_SEQUENCE_NUM' at 
'startingSequenceNumber' failed to satisfy constraint: Member must satisfy 
regular expression pattern: 0|([1-9]\d{0,128}) (Service: AmazonKinesis; Status 
Code: 400; Error Code: ValidationException; Request ID: ..)
"
 
How do I increase the ClientConfiguration.clientExecutiontimeout to avoid this 
issue or is this the right way to handle this issue ? I don't want the 
FlinkKinesisConsumer streaming app to fail just because there might be no 
records in the Kinesis Stream. I am using TRIM_HORIZON to read from the start 
of the Kinesis Stream.
 
 TIA,

Re: Connection leak with flink elastic Sink

2018-12-13 Thread Tzu-Li (Gordon) Tai
Hi,

Besides the information that Chesnay requested, could you also provide a stack 
trace of the exception that caused the job to terminate in the first place?

The Elasticsearch sink does indeed close the internally used Elasticsearch 
client, which should in turn properly release all resources [1].
I would like to double check whether or not the case here is that that part of 
the code was never reached.

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L334

On 13 December 2018 at 5:59:34 PM, Chesnay Schepler (ches...@apache.org) wrote:

Specifically which connector are you using, and which Flink version?  

On 12.12.2018 13:31, Vijay Bhaskar wrote:  
> Hi  
> We are using flink elastic sink which streams at the rate of 1000  
> events/sec, as described in  
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html.
>   
> We are observing connection leak of elastic connections. After few  
> minutes all the open connections are exceeding the process limits of  
> the max open descriptors and Job is getting terminated. But the http  
> connections with the elastic search server remain open forever. Am i  
> missing any specific configuration setting to close the open  
> connection, after serving the request?  
> But there is no such setting is described in the above documentation  
> of elastic sink  
>  
> Regards  
> Bhaskar  




Re: Using FlinkKinesisConsumer through a proxy

2018-12-01 Thread Tzu-Li (Gordon) Tai
Good to hear that it's working, thanks for the update!

On Sat, Dec 1, 2018, 4:29 AM Vijay Balakrishnan  Hi Gordon,
> Finally figured out my issue.Do not need to add http:// in proxyHost name.
> String proxyHost= "proxy-chaincom";//not http://proxy-chain...com
> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
> "proxyHost", proxyHost);//<== mo http:// in proxyHost name
>
> TIA,
> Vijay
>
>
> On Wed, Nov 14, 2018 at 12:50 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Vijay,
>>
>> I’m pretty sure that this should work with the properties that you
>> provided, unless the AWS Kinesis SDK isn’t working as expected.
>>
>> What I’ve tested is that with those properties, the ClientConfiguration
>> used to build the Kinesis client has the proxy domain / host / ports etc.
>> properly set.
>> And according to [1], this should be enough to configure the constructed
>> Kinesis client to connect via the proxy.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/section-client-configuration.html
>>
>>
>> On 7 November 2018 at 1:19:02 AM, Vijay Balakrishnan (bvija...@gmail.com)
>> wrote:
>>
>> Hi Gordon,
>> This still didn't work :(
>>
>> Tried a few combinations with:
>> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "proxyDomain", "...");
>>
>> inesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "proxyHost", "http://.com;);
>>
>> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "proxyPort", "911");
>>
>> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "proxyUsername", "...");
>>
>> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "proxyPassword", "..");
>>
>> kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
>> "nonProxyHosts", "
>>
>>
>> How does the FlinkKinesisProducer work so seamlessly through a proxy ?
>> TIA,
>> Vijay
>>
>> On Thu, Oct 4, 2018 at 6:41 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> Since Flink 1.5, you should be able to set all available configurations
>>> on the ClientConfiguration through the consumer Properties (see FLINK-9188
>>> [1]).
>>>
>>> The way to do that would be to prefix the configuration you want to set
>>> with "aws.clientconfig" and add that to the properties, as such:
>>>
>>> ```
>>> Properties kinesisConsumerProps = new Properties();
>>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
>>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
>>> kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
>>> ...
>>> ```
>>>
>>> Could you try that out and see if it works for you?
>>>
>>> I've also realized that this feature isn't documented very well, and
>>> have opened a ticket for that [2].
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9188
>>> [2] https://issues.apache.org/jira/browse/FLINK-10492
>>>
>>> On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm looping in Gordon and Thomas, they might have some idea about how
>>>> to resolve this.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 3. Oct 2018, at 17:29, Vijay Balakrishnan 
>>>> wrote:
>>>>
>>>> I have been trying with all variations  to no avail of java
>>>> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://...
>>>> -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=..
>>>> -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=...
>>>> -Dhttp.proxyPassword=... -jar .. after looking at the code in
>>>> com.amazonaws.ClientConfiguration
>>>>
>>>> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
>>>> wrote:
>>>>
>>>>> HI,
>>>>> How do I use FlinkKinesisConsumer using the Properties through a proxy
>>>>> ? Getting a Connection issue through the proxy.
>>>>> Works outside the proxy.
>>>>>
>>>>> Properties kinesisConsumerConfig = new Properties();
>>>>>
>>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
>>>>>
>>>>> if (local) {
>>>>>
>>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>>>>> accessKey);
>>>>>
>>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>>>>> secretKey);
>>>>> } else {
>>>>>
>>>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>>>>> "AUTO");
>>>>> }
>>>>>
>>>>> //only for Consumer
>>>>>
>>>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>>>>> "1");
>>>>>
>>>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>>>>> "2000");
>>>>> FlinkKinesisConsumer>
>>>>> kinesisConsumer = new FlinkKinesisConsumer<>(
>>>>> "kinesisTopicRead", new Tuple2KinesisSchema(),
>>>>> kinesisConsumerConfig);
>>>>> TIA
>>>>>
>>>>
>>>>


Re: Last batch of stream data could not be sinked when data comes very slow

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi Henry,

Flushing of buffered data in sinks should occur on two occasions - 1) when some 
buffer size limit is reached or a fixed-flush interval is fired, and 2) on 
checkpoints.

Flushing any pending data before completing a checkpoint ensures the sink has 
at-least-once guarantees, so that should answer your question about data loss.
For data delay due to the buffering, my only suggestion would be to have a 
time-interval based flushing configuration.
That is what is currently happening, for example, in the Kafka / Kinesis 
producer sinks. Records are buffered, and flushed at fixed intervals or when 
the buffer is full. They are also flushed on every checkpoint.

Cheers,
Gordon

On 13 November 2018 at 5:07:32 PM, 徐涛 (happydexu...@gmail.com) wrote:

Hi Experts,
When we implement a sink, usually we implement a batch, according to the record 
number or when reaching a time interval, however this may lead to data of last 
batch do not write to sink. Because it is triggered by the incoming record.
I also test the JDBCOutputFormat provided by flink, and found that it also has 
the same problem. If the batch size is 50, and 49 items arrive, but the last 
one comes in an hour later, then the 49 items will not be written to sink 
during the one hour. This may cause data delay or data loss.
So should any pose a solution to this problem?
Thanks a lot.

Best
Henry 

Re: Best practice to write data from a stream to non-relational, distributed database (hbase)

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi,

Have you taken a look yet at this [1]? That is an example of writing a stream 
to HBase.

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java


On 11 November 2018 at 5:45:27 PM, Marke Builder (marke.buil...@gmail.com) 
wrote:

Hi,

what is the prefered way to wirte streaming data to hbase?
Rolling File Sink or Streaming File Sink?
How can I configure this (open the connection with conf, and the write 
handling(key,data)?
What do I have to consider about the partitions? I prefer a write pro 
partition. 

Thanks! 
Marke

Re: Flink auth against Zookeeper with MD5-Digest

2018-11-14 Thread Tzu-Li (Gordon) Tai
Hi,

AFAIK, I don’t think there has been other discussions on this other than the 
original document on secured data access for Flink [1].

Unfortunately, I’m also not knowledgeable enough to comment on how feasible it 
would be to support MD5-Digest for authentication.
Maybe Eron (cc’ed) can chime in here if he has any comments.

Cheers,
Gordon

[1] 
https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing


On 12 November 2018 at 11:41:24 PM, Laura Uzcátegui 
(laura.uzcateg...@gmail.com) wrote:

Hi, 

 I was wondering if there is any plans in the near future to include support 
for another  authentication mechanism different than Kerberos? such as 
MD5-Digest ? 

Cheers, 

<    1   2   3   4   5   6   >