Re: Understanding timestamp and watermark assignment errors

2019-03-08 Thread Andrew Roberts
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more 
circumstances. 

> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf  wrote:
> 
> Hi Andrew, 
> 
> which Flink version do you use? This sounds a bit like 
> https://issues.apache.org/jira/browse/FLINK-8836.
> 
> Cheers, 
> 
> Konstantin
> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts  wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into 
>> something that aligns more with the Flink windowing framework, and 
>> particularly, start using “event time” instead of “ingest time” as a time 
>> characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
>> source), and while my data is generally time-ordered, there are some 
>> upstream races, so I’m attempting to assign timestamps and watermarks using 
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When 
>> I assign timestamps directly in the Kafka sources (I’m also connecting two 
>> Kafka streams here) using 
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my 
>> extractor has to do a bunch of “faking” because not every record that is 
>> produced will have a valid timestamp - for example, a record that can’t be 
>> parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just 
>> records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in 
>> aggregating state
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>> ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little 
>> information that I can use to dig into this issue. Can anyone give me any 
>> insight into what is going wrong here? I’d much prefer assigning timestamps 
>> after filtering, rather than in the Kafka source, because I can filter down 
>> to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I 
>> also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at 
>> 

Re: Understanding timestamp and watermark assignment errors

2019-03-08 Thread Konstantin Knauf
Hi Andrew,

which Flink version do you use? This sounds a bit like
https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts  wrote:

> Hello,
>
> I’m trying to convert some of our larger stateful computations into
> something that aligns more with the Flink windowing framework, and
> particularly, start using “event time” instead of “ingest time” as a time
> characteristics.
>
> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka
> source), and while my data is generally time-ordered, there are some
> upstream races, so I’m attempting to assign timestamps and watermarks using
> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When
> I assign timestamps directly in the Kafka sources (I’m also connecting two
> Kafka streams here) using
> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my
> extractor has to do a bunch of “faking” because not every record that is
> produced will have a valid timestamp - for example, a record that can’t be
> parsed won’t.
>
> When I assign timestamps downstream, after filtering the stream down to
> just records that are going to be windowed, I see errors in my Flink job:
>
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
> ... 6 more
>
> I am calling aggregate() on my windows, but otherwise I see very little
> information that I can use to dig into this issue. Can anyone give me any
> insight into what is going wrong here? I’d much prefer assigning timestamps
> after filtering, rather than in the Kafka source, because I can filter down
> to only records that I know will have timestamps.
>
> When experimenting with the lateness in my timestamp/watermark assigner, I
> also saw a similarly opaque exception:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at 

Re: Backoff strategies for async IO functions?

2019-03-08 Thread Konstantin Knauf
Hi William,

the AsyncOperator does not have such a setting. It is "merely" a wrapper
around an asynchronous call, which provides integration with Flink's state
& time management.

I think, the way to go would be to do the exponential back-off in the user
code and set the timeout of the AsyncOperator to the sum of the timeouts in
the user code (e.g. 2s + 4s + 8s + 16s).

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 5:20 PM William Saar  wrote:

> Hi,
> Is there a way to specify an exponential backoff strategy for when async
> function calls fail?
>
> I have an async function that does web requests to a rate-limited API. Can
> you handle that with settings on the async function call?
>
> Thanks,
> William
>
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-08 Thread Konstantin Knauf
Hi Tony,

before Flink 1.8 expired state is only cleaned up, when you try to access
it after expiration, i.e. when user code tries to access the expired state,
the state value is cleaned and "null" is returned. There was also already
the option to clean up expired state during full snapshots (
https://github.com/apache/flink/pull/6460). With Flink 1.8 expired state is
cleaned up continuously in the background regardless of checkpointing or
any attempt to access it after expiration.

As a reference the linked JIRA tickets should be a good starting point.

Hope this helps.

Konstantin




On Fri, Mar 8, 2019 at 10:45 AM Tony Wei  wrote:

> Hi everyone,
>
> I read the Flink 1.8 release notes about state [1], and it said
>
> *Continuous incremental cleanup of old Keyed State with TTL*
>> We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510
>> ). This feature
>> allowed to clean up and make inaccessible keyed state entries when
>> accessing them. In addition state would now also being cleaned up when
>> writing a savepoint/checkpoint.
>> Flink 1.8 introduces continous cleanup of old entries for both the
>> RocksDB state backend (FLINK-10471
>> ) and the heap state
>> backend (FLINK-10473 ).
>> This means that old entries (according to the ttl setting) are continously
>> being cleanup up.
>
>
> I'm not familiar with TTL's implementation in Flink 1.6 and what new
> features introduced in Flink
> 1.8. I don't understand what difference between these two release version
> after reading the
> release notes. Did they change the outcome of TTL feature, or provide new
> TTL features, or just
> change the behavior of executing TTL mechanism.
>
> Could you give me more references to learn about it? A simple example
> to illustrate it is more
> appreciated. Thank you.
>
> Best,
> Tony Wei
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: [DISCUSS] Create a Flink ecosystem website

2019-03-08 Thread Bowen Li
Confluent hub  for Kafka is another good
example of this kind. I personally like it over the spark site. May worth
checking it out with Kafka folks

On Thu, Mar 7, 2019 at 6:06 AM Becket Qin  wrote:

> Absolutely! Thanks for the pointer. I'll submit a PR to update the
> ecosystem page and the navigation.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Mar 7, 2019 at 8:47 PM Robert Metzger  wrote:
>
> > Okay. I will reach out to spark-packages.org and see if they are willing
> > to share.
> >
> > Do you want to raise a PR to update the ecosystem page (maybe sync with
> > the "Software Projects" listed here:
> > https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink) and
> > link it in the navigation?
> >
> > Best,
> > Robert
> >
> >
> > On Thu, Mar 7, 2019 at 10:13 AM Becket Qin  wrote:
> >
> >> Hi Robert,
> >>
> >> I think it at least worths checking if spark-packages.org owners are
> >> willing to share. Thanks for volunteering to write the requirement
> >> descriptions! In any case, that will be very helpful.
> >>
> >> Since a static page has almost no cost, and we will need it to redirect
> >> to the dynamic site anyways, how about we first do that while working on
> >> the dynamic website?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Thu, Mar 7, 2019 at 4:59 AM Ufuk Celebi  wrote:
> >>
> >>> I like Shaoxuan's idea to keep this a static site first. We could then
> >>> iterate on this and make it a dynamic thing. Of course, if we have the
> >>> resources in the community to quickly start with a dynamic site, I'm
> >>> not apposed.
> >>>
> >>> – Ufuk
> >>>
> >>> On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger 
> >>> wrote:
> >>> >
> >>> > Awesome! Thanks a lot for looking into this Becket! The VMs hosted by
> >>> Infra
> >>> > look suitable.
> >>> >
> >>> > @Shaoxuan: There is actually already a static page. It used to be
> >>> linked,
> >>> > but has been removed from the navigation bar for some reason. This is
> >>> the
> >>> > page: https://flink.apache.org/ecosystem.html
> >>> > We could update the page and add it back to the navigation bar for
> the
> >>> > coming weeks. What do you think?
> >>> >
> >>> > I would actually like to push for a dynamic page right away.
> >>> >
> >>> > I know it's kind of a bold move, but how do you feel about sending
> the
> >>> > owners of spark-packages.org a short note, if they are interested in
> >>> > sharing the source? We could maintain the code together in a public
> >>> repo.
> >>> > If they are not interested in sharing, or we decide not to ask in the
> >>> first
> >>> > place, I'm happy to write down a short description of the
> requirements,
> >>> > maybe some mockups. We could then see if we find somebody here in the
> >>> > community who's willing to implement it.
> >>> > Given the number of people who are eager to contribute, I believe we
> >>> will
> >>> > be able to find somebody pretty soon.
> >>> >
> >>> >
> >>> > On Wed, Mar 6, 2019 at 3:49 AM Becket Qin 
> >>> wrote:
> >>> >
> >>> > > Forgot to provide the link...
> >>> > >
> >>> > > [1] https://www.apache.org/dev/services.html#blogs (Apache infra
> >>> services)
> >>> > > [2] https://www.apache.org/dev/freebsd-jails (FreeBSD Jail
> provided
> >>> by
> >>> > > Apache Infra)
> >>> > >
> >>> > > On Wed, Mar 6, 2019 at 10:46 AM Becket Qin 
> >>> wrote:
> >>> > >
> >>> > >> Hi Robert,
> >>> > >>
> >>> > >> Thanks for the feedback. These are good points. We should
> absolutely
> >>> > >> shoot for a dynamic website to support more interactions in the
> >>> community.
> >>> > >> There might be a few things to solve:
> >>> > >> 1. The website code itself. An open source solution would be
> great.
> >>> TBH,
> >>> > >> I do not have much experience on building a website. It'll be
> great
> >>> if
> >>> > >> someone could help comment on the solution here.
> >>> > >> 2. The hardware to host the website. Apache Infra provides a few
> >>> > >> services[1] that Apache projects can leverage. I did not see
> >>> database
> >>> > >> service, but maybe we can run a simple MySQL db in FreeBSD
> jail[2].
> >>> > >>
> >>> > >> @Bowen & vino, thanks for the positive feedback!
> >>> > >>
> >>> > >> @Shaoxuan Wang 
> >>> > >> Thanks for the suggestion. That sounds reasonable to me. We
> >>> probably need
> >>> > >> a page in the Flink official site anyways, even just provide links
> >>> it to
> >>> > >> the ecosystem website. So listing the connectors in that static
> >>> page seems
> >>> > >> something we could start with while we are working on the dynamic
> >>> pages.
> >>> > >>
> >>> > >> Thanks,
> >>> > >>
> >>> > >> Jiangjie (Becket) Qin
> >>> > >>
> >>> > >> On Wed, Mar 6, 2019 at 10:40 AM Shaoxuan Wang <
> wshaox...@gmail.com>
> >>> > >> wrote:
> >>> > >>
> >>> > >>> Hi Becket and Robert,
> >>> > >>>
> >>> > >>> I like this idea!  Let us roll this out with Flink connectors at
> >>> the
> >>> > >>> first beginning. We can start with a static page, and 

Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread shkob1
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of: 
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [EXTERNAL] Flink 1.7.1 KafkaProducer error using Exactly Once semantic

2019-03-08 Thread Slotterback, Chris
Hi Timothy,

I recently faced a similar issue that spawned a bug discussion from the devs: 
https://issues.apache.org/jira/browse/FLINK-11654

As far as I can tell your understanding is correct, we also renamed the UID 
using the jobname to force uniqueness across identical jobs writing to the same 
broker. The downside for us is keeping the job names unique while dealing with 
things like multi datacenter and multiple dev/qa/prod environments.

As for your idea for requiring the identifier be set when using 
Semantic.EXACTLY_ONCE, I agree with you as it was not immediately obvious to us 
that there was a collision occurring between the jobs.

Chris

From: Timothy Victor 
Date: Friday, March 8, 2019 at 7:28 AM
To: user 
Subject: [EXTERNAL] Flink 1.7.1 KafkaProducer error using Exactly Once semantic

Yesterday I came across a weird problem when attempting to run 2 nearly 
identical jobs on a cluster.  I was able to solve it (or rather workaround it), 
but am sharing here so we can consider a potential fix in Flink's KafkaProducer 
code.

My scenario is as follows.  I have a Flink program that reads from a Kafka 
topic, does a simple Map() operation and writes to a Kafka topic with exactly 
once semantic.  The source and sink topics are configurable, and the Map() 
operation is also configurable (i.e. based on CLI arguments it can choose 
between a set of Map() operations).  The job does not name any of its operators 
(this part sounds trivial, right?...but read on).   I run an instance of this 
program (job) on a Flink standalone cluster running 1.7.1.  The cluster has 12 
TMs, each with 1 slot each.   So basically each job will run in its own task 
slot/TM, and hence each job would run in its own JVM process.  The job runs 
fine, checkpointing regularly and no errors.   However, if I start another 
instance of the program (with different source/sink topics), then within a few 
seconds the first one fails, and enters recovery.   The first one will 
eventually fail (all retries exhausted).   If I try to start the failed job 
again, then the second job would fail within a few seconds.   So basically it 
looked like one job was tripping over the other.   This was especially odd 
since each job was running in essentially its own JVM process (i.e. Task 
Manager / Task Slot).

Looking at the flink logs, I saw this error message: >> " 
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
operation with an old epoch. Either there is a newer producer with the same 
transactionalId, or the producer's transaction has been expired by the broker. "

So I looked at the transactionId - and saw that they were of the form:  
"transaction.id
 = Source: Custom Source -> Map -> Sink: Unamed-", essentially the 
transaction.id
 is set to the description of the chained operator followed by some GUID.   It 
is not clear to me how the GUID is generated --- but essentially BOTH my jobs 
were using the same 
transaction.id!

If I understand correctly, Flink uses a pool of KafkaProducers.  Each 
KafkaProducer within the pool has a 
transaction.id
 associated with it.   I think what is happening each of my jobs has its own 
pool of KafkaProducers.  However, each producer in both pools essentially have 
the same ID.  So like JobA.Pool: {P1, P2, P3},   JobB.Pool: {P1, P2, P3}.   
This sounds like it would not be a problem since each pool will live in its own 
JVM process.  But since it does break, my conjecture is this -- with the way 
2-phase commit works, in the _commit_ phase, I believe the JM sends a signal to 
each operator to commit its state.   My guess is that since the IDs collide, 
the Producer in one pool is told to commit the transaction with an epoch for a 
producer in the other pool which happens to be less than the last epoch for it. 
  Example P1 (in Job A) gets a message to commit with epoch 0 that is actually 
meant for P1 (in Job B).   The only other explanation I can think of is that 
these pools are in fact shared between task managers -- but that's really hard 
to believe.

Is my understanding correct?

I was able to solve this by simply naming one of my operators so that the 
transaction.id
 will be unique for each job.   Example,
JobA  
transaction.id
 = "Source: Custom Source -> (JobA) -> Sink: unamed-guid"
JobB 

Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread Rong Rong
Hi Shahar,

1. Are you referring to that the incoming data source is published as JSON
and you have a customized Pojo source function / table source that converts
it? In that case it is you that maintains the schema evolution support am I
correct? For Avro I think you can refer to [1].
2. If you change the SQL, you will have to recompile and rerun your job.
This means the new compilation of the SQL will yield correct logic to run
against your new schema. I don't foresee this to be an issue. For the
second problem: yes it is your customized serialization sink function's
responsibility to convert Row into the output class objects. I am not sure
if this is the piece of code that you are looking for [2] if you are using
Avro, but you might be able to leverage that?

If you are sticking with your own format of generated/dynamic class, you
might have to create that in your custom source/sink table.

Thanks,
Rong

[1]
https://github.com/apache/flink/tree/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro
[2]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java#L170

On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Thanks for the response Rong. Would be happy to clarify more.
> So there are two possible changes that could happen:
>
>1. There could be a change in the incoming source schema. Since
>there's a deserialization phase here (JSON -> Pojo) i expect a couple of
>options. Backward compatible changes to the JSON should not have an impact
>(as the Pojo is the same), however we might want to change the Pojo which i
>believe is a state evolving action. I do want to migrate the Pojo to Avro -
>will that suffice for Schema evolution feature to work?
>2. The other possible change is the SQL select fields change, as
>mention someone could add/delete/change-order another field to the SQL
>Select. I do see this as an issue per the way i transform the Row object to
>the dynamically generated class. This is done today through indices of the
>class fields and the ones of the Row object. This seems like an issue for
>when for example a select field is added in the middle and now there's an
>older Row which fields order is not matching the (new) generated Class
>fields order. I'm thinking of how to solve that one - i imagine this is not
>something the schema evolution feature can solve (am i right?). im thinking
>on whether there is a way i can transform the Row object to my generated
>class by maybe the Row's column names corresponding to the generated class
>field names, though i don't see Row object has any notion of column names.
>
> Would love to hear your thoughts. If you want me to paste some code here i
> can do so.
>
> Shahar
>
> On Thu, Mar 7, 2019 at 10:40 AM Rong Rong  wrote:
>
>> Hi Shahar,
>>
>> I wasn't sure which schema are you describing that is going to "evolve"
>> (is it the registered_table? or the output sink?). It will be great if you
>> can clarify more.
>>
>> For the example you provided, IMO it is more considered as logic change
>> instead of schema evolution:
>> - if you are changing max(c) to max(d) in your query. I don't think this
>> qualifies as schema evolution.
>> - if you are adding another column "max(d)" to your query along with your
>> existing "max(c)" that might be considered as a backward compatible change.
>> However, either case you will have to restart your logic, you can also
>> consult how state schema evolution [1], and there are many other problems
>> that can be tricky as well[2,3].
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>> [2]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
>> [3]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293
>>
>>
>> On Wed, Mar 6, 2019 at 12:52 PM shkob1 
>> wrote:
>>
>>> Hey,
>>>
>>> My job is built on SQL that is injected as an input to the job. so lets
>>> take
>>> an example of
>>>
>>> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>>>
>>> (side note: in order for the state not to grow indefinitely i'm
>>> transforming
>>> to a retracted stream and filtering based on a custom trigger)
>>>
>>> In order to get the output as a Json format i basically created a way to
>>> dynamically generate a class and registering it to the class loader, so
>>> when
>>> transforming to the retracted stream im doing something like:
>>>
>>> Table result = tableEnv.sqlQuery(sqlExpression);
>>> tableEnv.toRetractStream(result, Row.class, config)
>>> .filter(tuple -> tuple.f0)
>>> .map(new RowToDynamicClassMapper(sqlSelectFields))
>>> 

Flink 1.7.1 KafkaProducer error using Exactly Once semantic

2019-03-08 Thread Timothy Victor
Yesterday I came across a weird problem when attempting to run 2 nearly
identical jobs on a cluster.  I was able to solve it (or rather workaround
it), but am sharing here so we can consider a potential fix in Flink's
KafkaProducer code.

My scenario is as follows.  I have a Flink program that reads from a Kafka
topic, does a simple Map() operation and writes to a Kafka topic with
exactly once semantic.  The source and sink topics are configurable, and
the Map() operation is also configurable (i.e. based on CLI arguments it
can choose between a set of Map() operations).  The job does not name any
of its operators (this part sounds trivial, right?...but read on).   I run
an instance of this program (job) on a Flink standalone cluster running
1.7.1.  The cluster has 12 TMs, each with 1 slot each.   So basically each
job will run in its own task slot/TM, and hence each job would run in its
own JVM process.  The job runs fine, checkpointing regularly and no
errors.   However, if I start another instance of the program (with
different source/sink topics), then within a few seconds the first one
fails, and enters recovery.   The first one will eventually fail (all
retries exhausted).   If I try to start the failed job again, then the
second job would fail within a few seconds.   So basically it looked like
one job was tripping over the other.   This was especially odd since each
job was running in essentially its own JVM process (i.e. Task Manager /
Task Slot).

Looking at the flink logs, I saw this error message: >> "
org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker. "

So I looked at the transactionId - and saw that they were of the form:  "
transaction.id = Source: Custom Source -> Map -> Sink: Unamed-",
essentially the transaction.id is set to the description of the chained
operator followed by some GUID.   It is not clear to me how the GUID is
generated --- but essentially BOTH my jobs were using the same
transaction.id!

If I understand correctly, Flink uses a pool of KafkaProducers.  Each
KafkaProducer within the pool has a transaction.id associated with it.   I
think what is happening each of my jobs has its own pool of
KafkaProducers.  However, each producer in both pools essentially have the
same ID.  So like JobA.Pool: {P1, P2, P3},   JobB.Pool: {P1, P2, P3}.
 This sounds like it would not be a problem since each pool will live in
its own JVM process.  But since it does break, my conjecture is this --
with the way 2-phase commit works, in the _commit_ phase, I believe the JM
sends a signal to each operator to commit its state.   My guess is that
since the IDs collide, the Producer in one pool is told to commit the
transaction with an epoch for a producer in the other pool which happens to
be less than the last epoch for it.   Example P1 (in Job A) gets a message
to commit with epoch 0 that is actually meant for P1 (in Job B).   The only
other explanation I can think of is that these pools are in fact shared
between task managers -- but that's really hard to believe.

Is my understanding correct?

I was able to solve this by simply naming one of my operators so that the
transaction.id will be unique for each job.   Example,
JobA  transaction.id = "Source: Custom Source -> (JobA) -> Sink:
unamed-guid"
JobB transaction.id = "Source: Custom Source -> (JobB) -> Sink: unamed-guid"

After I did this - both jobs run successfully.

I think a good improvement would be to _not_ use the job graph description
as the transaction ID.   Maybe a simple approach is to require the user to
provide a pool identifier when using Exactly Once with Kafka.  At least
this would make it clear.

Thanks

Tim


Intermittent KryoException

2019-03-08 Thread Scott Sue
Hi,

When running our job, we’re seeing sporadic instances of when we have 
KryoExceptions.  I’m new to this area of Flink so I’m not exactly too sure what 
I could look out for.  From my understanding, Kryo is the default serializer 
for generic types, and whilst there is a potential performance penalty with 
using Kryo, it should be able to serialize / deserialize all objects without 
fail?

Another point is that our object is mutable through as it runs through the 
different operators, could periodic checkpointing be a cause of the below 
issues?

We are currently running Flink 1.7.1


11:02:43,075 INFO  org.apache.flink.runtime.taskmanager.Task
 - Window(ProcessingTimeSessionWindows(1), ProcessingTimeTrigger, 
CoGroupWindowFunction) -> Flat Map -> Sink: Unnamed (1/1) (a83e88eaf06490de
c8326e4d9bd0ed26) switched from RUNNING to FAILED.
TimerException{com.esotericsoftware.kryo.KryoException: 
java.lang.ArrayIndexOutOfBoundsException: 1024
Serialization trace:
payload (com.celertech.analytics.bo.AnalyticsDataJsonMessage)}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.ArrayIndexOutOfBoundsException: 1024
Serialization trace:
payload (com.celertech.analytics.bo.AnalyticsDataJsonMessage)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:538)
at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:507)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at 
org.apache.flink.runtime.state.heap.AbstractHeapAppendingState.getInternal(AbstractHeapAppendingState.java:57)
at 
org.apache.flink.runtime.state.heap.HeapListState.get(HeapListState.java:85)
at 
org.apache.flink.runtime.state.heap.HeapListState.get(HeapListState.java:43)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.getStash(IdentityObjectIntMap.java:256)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:247)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:619)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:564)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:84)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
... 22 more


Regards,
Scott






-- 








_This message, including any attachments, may include private, 
privileged and confidential information and is intended only for the 
personal and confidential use of the intended recipient(s). If the reader 
of this message is not an intended recipient, you are hereby notified that 
any review, use, dissemination, distribution, printing 

??????sql-client batch ????????????

2019-03-08 Thread yuess_coder
 


 
------
??: "Zhenghua Gao"
: 2019??3??8??(??) 10:57
??: "user-zh";
: Re: sql-client batch 


??DebugBatchCsvTableFactory??
BatchCompatibleTableSinkFactory ?? BatchTableSinkFactory??

/**
 * A CSV table factory.
 */
public class CsvTableFactory implements
   StreamTableSourceFactory,
   BatchTableSourceFactory,
   StreamTableSinkFactory,
   BatchCompatibleTableSinkFactory {

 ExternalTableUtil.scala(line 111) ??Java SPI 
??Factory
BatchTableSinkFactory??

/**
  * Converts an [[CatalogTable]] instance to a [[TableSink]] instance
  *
  * @param name  name of the table
  * @param externalTable the [[CatalogTable]] instance to convert
  * @param isStreaming   is in streaming mode or not.
  * @return
  */
def toTableSink(
 name: String,
 externalTable: CatalogTable,
 isStreaming: Boolean): TableSink[_] = {

  val tableProperties: TableProperties = generateTableProperties(name,
externalTable, isStreaming)
  if (isStreaming) {
val tableFactory =
TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
  getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
  } else {
val tableFactory =
TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
  getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
  }
}

?? CsvTableFactory?? 
NoMatchingTableFactoryException

 BatchCompatibleTableSinkFactory
??connector??()fix??
 
TableFactoryUtil(line97-108) BatchTableSinkFactory ??
BatchCompatibleTableSinkFactory ??patch??

NPE??

On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <642969...@qq.com> wrote:

> ??debug
>
>
> # Define tables here such as sources, sinks, views, or temporal tables.
>
>
> tables: [] # empty list
>
>
> # Define scalar, aggregate, or table functions here.
>
>
> functions: [] # empty list
>
>
>
>
> # Execution properties allow for changing the behavior of a table program.
>
>
> execution:
>   # 'batch' or 'streaming' execution
>   type: batch
>   # allow 'event-time' or only 'processing-time' in sources
>   time-characteristic: event-time
>   # interval in ms for emitting periodic watermarks
>   periodic-watermarks-interval: 200
>   # 'changelog' or 'table' presentation of results
>   result-mode: table
>   # maximum number of maintained rows in 'table' presentation of results
>   max-table-result-rows: 100
>   # parallelism of the program
>   parallelism: 1
>   # maximum parallelism
>   max-parallelism: 128
>   # minimum idle state retention in ms
>   min-idle-state-retention: 0
>   # maximum idle state retention in ms
>   max-idle-state-retention: 0
>   # controls how table programs are restarted in case of a failures
>   restart-strategy:
> # strategy type
> # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
> type: fallback
>
>
>
>
>
> #==
> # Deployment properties
>
> #==
>
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
>
> deployment:
>   # general cluster communication timeout in ms
>   response-timeout: 5000
>   # (optional) address from cluster to gateway
>   gateway-address: ""
>   # (optional) port from cluster to gateway
>   gateway-port: 0
>
>
>
>
>
> #==
> # Catalog properties
>
> #==
> #catalogs:
> #  - name: myhive
> #catalog:
> #  type: hive
> #  connector:
> #hive.metastore.uris: thrift://localhost:9083
> #  is-default: false
> #  default-database: default
>
>
>
> --  --
> ??: "Zhenghua Gao";
> : 2019??3??7??(??) 3:13
> ??: "user-zh";
>
> : Re: sql-client batch 
>
>
>
> ?? tableSink nulldebug??
>  conf/sql-client-defaults.yaml 
>
> On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <642969...@qq.com> wrote:
>
> > ??
> >
> >
> >
> >
> > java.lang.NullPointerException
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> >   at
> >

What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-08 Thread Tony Wei
Hi everyone,

I read the Flink 1.8 release notes about state [1], and it said

*Continuous incremental cleanup of old Keyed State with TTL*
> We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510
> ). This feature allowed
> to clean up and make inaccessible keyed state entries when accessing them.
> In addition state would now also being cleaned up when writing a
> savepoint/checkpoint.
> Flink 1.8 introduces continous cleanup of old entries for both the RocksDB
> state backend (FLINK-10471
> ) and the heap state
> backend (FLINK-10473 ).
> This means that old entries (according to the ttl setting) are continously
> being cleanup up.


I'm not familiar with TTL's implementation in Flink 1.6 and what new
features introduced in Flink
1.8. I don't understand what difference between these two release version
after reading the
release notes. Did they change the outcome of TTL feature, or provide new
TTL features, or just
change the behavior of executing TTL mechanism.

Could you give me more references to learn about it? A simple example
to illustrate it is more
appreciated. Thank you.

Best,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state