Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Yun Tang
HI Kevin,

Currently, you can view logs to find when to start and finish to restore [1] to 
know how much time spent on task side. Flink-1.13 also try to expose stage of 
task initializations [2] and maybe it could help you.


state.backend.rocksdb.metrics.total-sst-files-size should be correct to 
describe the sst file size. We can have several reasons why the savepoint size 
larger than sst-files size:

  1.  SST files are compressed with snappy format by default while savepoint 
not.
  2.  SST files could save spaces due to same prefix key bytes.
  3.  Some contents are still in memory write buffer and not yet flushed.

However, the difference is really huge, have you ever logined machines having 
keyed state to see how much space occupried? And what's the incremental 
checkpoint size of your job, have you ever enabeld TTL for state?


[1] https://issues.apache.org/jira/browse/FLINK-19013
[2] https://issues.apache.org/jira/browse/FLINK-17012

Best
Yun Tang



From: Guowei Ma 
Sent: Thursday, April 1, 2021 11:57
To: Kevin Lam 
Cc: user ; Yun Tang 
Subject: Re: Measuring the Size of State, Savepoint Size vs. Restore time

Hi, Kevin

If you use the RocksDB and want to know the data on the disk I think that is 
the right metric. But the SST files might include some expired data. Some data 
in memory is not included in the SST files yet. In general I think it could 
reflect the state size of your application.

I think that there is no metric for the time that spends on restoring from a 
savepoint.

As for why there is a huge difference between the size of sst and the size of 
savepoint, I think @Yun can give some detailed insights.

Best,
Guowei


On Thu, Apr 1, 2021 at 1:38 AM Kevin Lam 
mailto:kevin@shopify.com>> wrote:
Hi all,

We're interested in doing some analysis on how the size of our savepoints and 
state affects the time it takes to restore from a savepoint. We're running 
Flink 1.12 and using RocksDB as a state backend, on Kubernetes.

What is the best way to measure the size of a Flink Application's state? Is 
state.backend.rocksdb.metrics.total-sst-files-size
 the right thing to look at?

We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for all 
our operators, after restoring from a savepoint, and we noticed that the sum of 
all the sst files sizes is much much smaller than the total size of our 
savepoint (7GB vs 10TB).  Where does that discrepancy come from?

Do you have any general advice on correlating savepoint size with restore times?

Thanks in advance!


退订

2021-03-31 Thread Chouchou Mei
退订


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Dawid Wysakowicz
Hi all,

@Kurt @Arvid I think it's fine to merge those two, as they are pretty
much finished. We can wait for those two before creating the RC0.

@Leonard Personally I'd be ok with 3 more days for that single PR. I
find the request reasonable and I second that it's better to have a
proper review rather than rush unfinished feature and try to fix it
later. Moreover it got broader support. Unless somebody else objects, I
think we can merge this PR later and include it in RC1.

Best,

Dawid

On 01/04/2021 08:39, Arvid Heise wrote:
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
> are pretty much just waiting for AZP to turn green, it's separate from
> other components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
> 
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  > wrote:
>
> Hi Guowei and Dawid,
>
> I want to request the permission to merge this feature [1], it's a
> useful improvement to sql client and won't affect 
> other components too much. We were plan to merge it yesterday but
> met some tricky multi-process issue which 
> has a very high possibility hanging the tests. It took us a while
> to find out the root cause and fix it. 
>
> Since it's not too far away from feature freeze and RC0 also not
> created yet, thus I would like to include this
> in 1.13. 
>
> [1] https://issues.apache.org/jira/browse/FLINK-20320
> 
>
> Best,
> Kurt
>
>
> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  > wrote:
>
> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able
> to submit new features from tomorrow (4.1). Tomorrow we will
> create 1.13.0-rc0 for testing, welcome to help test together.
> After the test is relatively stable, we will cut the
> release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann
> mailto:trohrm...@apache.org>> wrote:
>
> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger
> mailto:rmetz...@apache.org>> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>>
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you
> linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that
> merge option
> > useful,
> > > > especially for small simple changes and for
> backports. The following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> 
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org 
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed
> would be the time
> > > for
> > > >> a Feature Freeze. From the knowledge I've gather so
> far it still seems
> > > to
> > > >> be a viable plan. I think it is a good time to
> agree on a particular
> > > date,
> > > >> when it should happen. We suggest *(end of day
> CEST) March 31st*
> > > >> (Wednesday next week) as the feature freeze time.
> > > >>
> > > >> Similarly as last time, we want to create RC0 on
> the day after the
> > > feature
> > > >> freeze, to make sure the RC creation process is
> running smoothly, and
> > to
> > > >> have a common testing reference point.
> > > >>
> 

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Arvid Heise
Hi Dawid and Guowei,

I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
pretty much just waiting for AZP to turn green, it's separate from other
components, and it's a super useful feature for Flink users.

Best,

Arvid

[1] https://github.com/apache/flink/pull/15054

On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:

> Hi Guowei and Dawid,
>
> I want to request the permission to merge this feature [1], it's a useful
> improvement to sql client and won't affect
> other components too much. We were plan to merge it yesterday but met some
> tricky multi-process issue which
> has a very high possibility hanging the tests. It took us a while to find
> out the root cause and fix it.
>
> Since it's not too far away from feature freeze and RC0 also not created
> yet, thus I would like to include this
> in 1.13.
>
> [1] https://issues.apache.org/jira/browse/FLINK-20320
>
> Best,
> Kurt
>
>
> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>
>> Hi, community:
>>
>> Friendly reminder that today (3.31) is the last day of feature
>> development. Under normal circumstances, you will not be able to submit new
>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>> testing, welcome to help test together.
>> After the test is relatively stable, we will cut the release-1.13 branch.
>>
>> Best,
>> Dawid & Guowei
>>
>>
>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>> wrote:
>>
>>> +1 for the 31st of March for the feature freeze.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>>> wrote:
>>>
>>> > +1 for March 31st for the feature freeze.
>>> >
>>> >
>>> >
>>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>>> dwysakow...@apache.org>
>>> > wrote:
>>> >
>>> > > Thank you Thomas! I'll definitely check the issue you linked.
>>> > >
>>> > > Best,
>>> > >
>>> > > Dawid
>>> > >
>>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>>> > > > Hi Dawid,
>>> > > >
>>> > > > Thanks for the heads up.
>>> > > >
>>> > > > Regarding the "Rebase and merge" button. I find that merge option
>>> > useful,
>>> > > > especially for small simple changes and for backports. The
>>> following
>>> > > should
>>> > > > help to safeguard from the issue encountered previously:
>>> > > > https://github.com/jazzband/pip-tools/issues/1085
>>> > > >
>>> > > > Thanks,
>>> > > > Thomas
>>> > > >
>>> > > >
>>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>>> > dwysakow...@apache.org
>>> > > >
>>> > > > wrote:
>>> > > >
>>> > > >> Hi devs, users!
>>> > > >>
>>> > > >> 1. *Feature freeze date*
>>> > > >>
>>> > > >> We are approaching the end of March which we agreed would be the
>>> time
>>> > > for
>>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>>> seems
>>> > > to
>>> > > >> be a viable plan. I think it is a good time to agree on a
>>> particular
>>> > > date,
>>> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
>>> > > >> (Wednesday next week) as the feature freeze time.
>>> > > >>
>>> > > >> Similarly as last time, we want to create RC0 on the day after the
>>> > > feature
>>> > > >> freeze, to make sure the RC creation process is running smoothly,
>>> and
>>> > to
>>> > > >> have a common testing reference point.
>>> > > >>
>>> > > >> Having said that let us remind after Robert & Dian from the
>>> previous
>>> > > >> release what it a Feature Freeze means:
>>> > > >>
>>> > > >> *B) What does feature freeze mean?*After the feature freeze, no
>>> new
>>> > > >> features are allowed to be merged to master. Only bug fixes and
>>> > > >> documentation improvements.
>>> > > >> The release managers will revert new feature commits after the
>>> feature
>>> > > >> freeze.
>>> > > >> Rational: The goal of the feature freeze phase is to improve the
>>> > system
>>> > > >> stability by addressing known bugs. New features tend to
>>> introduce new
>>> > > >> instabilities, which would prolong the release process.
>>> > > >> If you need to merge a new feature after the freeze, please open a
>>> > > >> discussion on the dev@ list. If there are no objections by a PMC
>>> > member
>>> > > >> within 48 (workday)hours, the feature can be merged.
>>> > > >>
>>> > > >> 2. *Merge PRs from the command line*
>>> > > >>
>>> > > >> In the past releases it was quite frequent around the Feature
>>> Freeze
>>> > > date
>>> > > >> that we ended up with a broken main branch that either did not
>>> compile
>>> > > or
>>> > > >> there were failing tests. It was often due to concurrent merges
>>> to the
>>> > > main
>>> > > >> branch via the "Rebase and merge" button. To overcome the problem
>>> we
>>> > > would
>>> > > >> like to suggest only ever merging PRs from a command line. Thank
>>> you
>>> > > >> Stephan for the idea! The suggested workflow would look as
>>> follows:
>>> > > >>
>>> > > >>1. Pull the change and rebase on the current main branch
>>> > > >>2. Build the project (e.g. from IDE, which should be faster
>>> than
>>> > > >

Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
For 2. there are also efforts to expose the state and operator
initialization through the logs (see FLINK-17012 [1]).
For 3. the TypeSerializer [2] might be another point of interest. It is
used to serialize specific types. Other than that, the state
serialzation depends heavily on the used state backend. Hence, you want to
look into RocksDB's SSTables if relying on it as a state backend.

[1] https://issues.apache.org/jira/browse/FLINK-17012
[2]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java

On Thu, Apr 1, 2021 at 1:27 AM deepthi Sridharan <
deepthi.sridha...@gmail.com> wrote:

> Thanks, Matthias. This is very helpful.
>
> Regarding the checkpoint documentation, I was mostly looking for
> information on how states from various tasks get serialized into one (or
> more?) files on persistent storage. I'll check out the code pointers!
>
> On Wed, Mar 31, 2021 at 7:07 AM Matthias Pohl 
> wrote:
>
>> Hi Deepthi,
>> 1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
>> might be helpful.
>> 2. Unfortunately, Flink doesn't provide metrics like that. But you might
>> want to follow FLINK-21736 [2] for future developments.
>> 3. Is there anything specific you are looking for? Unfortunately, I don't
>> know any blogs for a more detailed summary. If you plan to look into the
>> code CheckpointCoordinator [3] might be a starting point. Alternatively,
>> something like MetadataV2V3SerializerBase [4] offers insights into how the
>> checkpoints' metadata is serialized.
>>
>> Best,
>> Matthias
>>
>> [1] https://github.com/apache/flink-benchmarks
>> [2] https://issues.apache.org/jira/browse/FLINK-21736
>> [3]
>> https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
>> [4]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>
>> On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
>> deepthi.sridha...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to set up some benchmarking with a couple of IO options for
>>> saving checkpoints and have a couple of questions :
>>>
>>> 1. Does flink come with any IO benchmarking tools? I couldn't find
>>> any. I was hoping to use those to derive some insights about the storage
>>> performance and extrapolate it for the checkpoint use case.
>>>
>>> 2. Are there any metrics pertaining to restore from checkpoints? The
>>> only metric I can find is the last restore time, but neither the time it
>>> took to read the checkpoints, nor the time it took to restore the
>>> operator/task states seem to be covered. I am using RocksDB, but couldn't
>>> find any metrics relating to how much time it took to restore the state
>>> backend from rocksdb either.
>>>
>>> 3. I am trying to find documentation on how the states are serialized
>>> into the checkpoint files from multiple operators and tasks to tailor the
>>> testing use case, but can't seem to find any. Are there any bogs that go
>>> into this detail or would reading the code be the only option?
>>>
>>> --
>>> Thanks,
>>> Deepthi
>>>
>>
>
> --
> Regards,
> Deepthi
>


Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Dian Fu
Hi Sumeet,

I think it should be a bug and I have created a ticket 
https://issues.apache.org/jira/browse/FLINK-22082 
 as the following up.

Regards,
Dian


> 2021年4月1日 下午12:25,Guowei Ma  写道:
> 
> Hi, Sumeet
> 
> I am not an expert about PyFlink. But I think @Dian Fu 
>   might give some insight about this problem.
> 
> Best,
> Guowei
> 
> 
> On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra  > wrote:
> Cross posting from StackOverlow here:
> 
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>  
> 
> 
> Any pointers are appreciated!
> 
> Thanks,
> Sumeet



Flink Taskmanager failure recovery and large state

2021-03-31 Thread Yaroslav Tkachenko
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure
recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and
checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the
taskmanagers, it takes a few minutes to recover (I see a few restarts), but
eventually when a new replacement taskmanager is registered with the
jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the
taskmanagers, the pipeline never recovers, even after the replacement
taskmanager has joined. It just enters an infinite loop of restarts and
failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING
-> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
It stays in RUNNING for a few seconds, but then transitions into FAILED
with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - 
(569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.30.10.53/10.30.10.53:45789')
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough
task slots to run it goes into this endless loop again. It's never the same
Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of
exceptions:


org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
(6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also


WARNING: Failed read retry #1/10 for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
Source)
at
java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
Source)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
retries for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [ (141/624)#7] ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [ (141/624)#7] WARN
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
Exception while restoring keyed state backend for
KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download
data for state handles.
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: 

Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Guowei Ma
Hi, Sumeet

I am not an expert about PyFlink. But I think @Dian Fu
  might give some insight about this problem.

Best,
Guowei


On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra 
wrote:

> Cross posting from StackOverlow here:
>
>
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>
> Any pointers are appreciated!
>
> Thanks,
> Sumeet
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Kurt Young
Hi Guowei and Dawid,

I want to request the permission to merge this feature [1], it's a useful
improvement to sql client and won't affect
other components too much. We were plan to merge it yesterday but met some
tricky multi-process issue which
has a very high possibility hanging the tests. It took us a while to find
out the root cause and fix it.

Since it's not too far away from feature freeze and RC0 also not created
yet, thus I would like to include this
in 1.13.

[1] https://issues.apache.org/jira/browse/FLINK-20320

Best,
Kurt


On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:

> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able to submit new
> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
> testing, welcome to help test together.
> After the test is relatively stable, we will cut the release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
> wrote:
>
>> +1 for the 31st of March for the feature freeze.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>> wrote:
>>
>> > +1 for March 31st for the feature freeze.
>> >
>> >
>> >
>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > wrote:
>> >
>> > > Thank you Thomas! I'll definitely check the issue you linked.
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>> > > > Hi Dawid,
>> > > >
>> > > > Thanks for the heads up.
>> > > >
>> > > > Regarding the "Rebase and merge" button. I find that merge option
>> > useful,
>> > > > especially for small simple changes and for backports. The following
>> > > should
>> > > > help to safeguard from the issue encountered previously:
>> > > > https://github.com/jazzband/pip-tools/issues/1085
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Hi devs, users!
>> > > >>
>> > > >> 1. *Feature freeze date*
>> > > >>
>> > > >> We are approaching the end of March which we agreed would be the
>> time
>> > > for
>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>> seems
>> > > to
>> > > >> be a viable plan. I think it is a good time to agree on a
>> particular
>> > > date,
>> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
>> > > >> (Wednesday next week) as the feature freeze time.
>> > > >>
>> > > >> Similarly as last time, we want to create RC0 on the day after the
>> > > feature
>> > > >> freeze, to make sure the RC creation process is running smoothly,
>> and
>> > to
>> > > >> have a common testing reference point.
>> > > >>
>> > > >> Having said that let us remind after Robert & Dian from the
>> previous
>> > > >> release what it a Feature Freeze means:
>> > > >>
>> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
>> > > >> features are allowed to be merged to master. Only bug fixes and
>> > > >> documentation improvements.
>> > > >> The release managers will revert new feature commits after the
>> feature
>> > > >> freeze.
>> > > >> Rational: The goal of the feature freeze phase is to improve the
>> > system
>> > > >> stability by addressing known bugs. New features tend to introduce
>> new
>> > > >> instabilities, which would prolong the release process.
>> > > >> If you need to merge a new feature after the freeze, please open a
>> > > >> discussion on the dev@ list. If there are no objections by a PMC
>> > member
>> > > >> within 48 (workday)hours, the feature can be merged.
>> > > >>
>> > > >> 2. *Merge PRs from the command line*
>> > > >>
>> > > >> In the past releases it was quite frequent around the Feature
>> Freeze
>> > > date
>> > > >> that we ended up with a broken main branch that either did not
>> compile
>> > > or
>> > > >> there were failing tests. It was often due to concurrent merges to
>> the
>> > > main
>> > > >> branch via the "Rebase and merge" button. To overcome the problem
>> we
>> > > would
>> > > >> like to suggest only ever merging PRs from a command line. Thank
>> you
>> > > >> Stephan for the idea! The suggested workflow would look as follows:
>> > > >>
>> > > >>1. Pull the change and rebase on the current main branch
>> > > >>2. Build the project (e.g. from IDE, which should be faster than
>> > > >>building entire project from cmd) -> this should ensure the
>> project
>> > > compiles
>> > > >>3. Run the tests in the module that the change affects -> this
>> > should
>> > > >>greatly minimize the chances of failling tests
>> > > >>4. Push the change to the main branch
>> > > >>
>> > > >> Let us know what you think!
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Guowei & Dawid
>> > > >>
>> > > >>
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Guowei Ma
Hi, Kevin

If you use the RocksDB and want to know the data on the disk I think that
is the right metric. But the SST files might include some expired data.
Some data in memory is not included in the SST files yet. In general I
think it could reflect the state size of your application.

I think that there is no metric for the time that spends on restoring from
a savepoint.

As for why there is a huge difference between the size of sst and the size
of savepoint, I think @Yun can give some detailed insights.

Best,
Guowei


On Thu, Apr 1, 2021 at 1:38 AM Kevin Lam  wrote:

> Hi all,
>
> We're interested in doing some analysis on how the size of our savepoints
> and state affects the time it takes to restore from a savepoint. We're
> running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.
>
> What is the best way to measure the size of a Flink Application's state?
> Is state.backend.rocksdb.metrics.total-sst-files-size
> 
> the right thing to look at?
>
> We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
> all our operators, after restoring from a savepoint, and we noticed that
> the sum of all the sst files sizes is much much smaller than the total size
> of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?
>
> Do you have any general advice on correlating savepoint size with restore
> times?
>
> Thanks in advance!
>


Re: How to specific key serializer

2021-03-31 Thread 陳昌倬
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote:
> You can try using TypeInfo annotations to specify a TypeInformationFactory
> for your key class [1].
> This allows you to "plug-in" the TypeInformation extracted by Flink for a
> given class. In that custom TypeInformation, you should let it return the
> correct serializer.

Hi Gordon,

Thanks for the tip. We have solve the problem by specific
TypeInformation in readKeyedState.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Guowei Ma
Hi, Robert
I think you could try to change the "s3://argo-artifacts/" to "
s3a://argo-artifacts/".
It is because that currently `StreamingFileSink` only supports Hadoop based
s3 but not Presto based s3. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations

Best,
Guowei


On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen  wrote:

> I’m using a local instance of MINIO on my kubernetes cluster for
> checkpoint/savepoint storage. I’m using this StreamingFileSync
> configuration:
>
>
> final StreamingFileSink> sink =
> StreamingFileSink.forRowFormat(
> new Path("s3://argo-artifacts/"),
> new SimpleStringEncoder Long>>("UTF-8"))
> .withBucketAssigner(new KeyBucketAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withOutputFileConfig(config)
> .build();
>
> Anyone know how to fix this exception?
>
> java.lang.UnsupportedOperationException: This s3 file system implementation 
> does not support recoverable writers.
> at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>
> --
> Robert Cullen
> 240-475-4490
>


Re: IO benchmarking

2021-03-31 Thread deepthi Sridharan
Thanks, Matthias. This is very helpful.

Regarding the checkpoint documentation, I was mostly looking for
information on how states from various tasks get serialized into one (or
more?) files on persistent storage. I'll check out the code pointers!

On Wed, Mar 31, 2021 at 7:07 AM Matthias Pohl 
wrote:

> Hi Deepthi,
> 1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
> might be helpful.
> 2. Unfortunately, Flink doesn't provide metrics like that. But you might
> want to follow FLINK-21736 [2] for future developments.
> 3. Is there anything specific you are looking for? Unfortunately, I don't
> know any blogs for a more detailed summary. If you plan to look into the
> code CheckpointCoordinator [3] might be a starting point. Alternatively,
> something like MetadataV2V3SerializerBase [4] offers insights into how the
> checkpoints' metadata is serialized.
>
> Best,
> Matthias
>
> [1] https://github.com/apache/flink-benchmarks
> [2] https://issues.apache.org/jira/browse/FLINK-21736
> [3]
> https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
> [4]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>
> On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
> deepthi.sridha...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to set up some benchmarking with a couple of IO options for
>> saving checkpoints and have a couple of questions :
>>
>> 1. Does flink come with any IO benchmarking tools? I couldn't find any. I
>> was hoping to use those to derive some insights about the storage
>> performance and extrapolate it for the checkpoint use case.
>>
>> 2. Are there any metrics pertaining to restore from checkpoints? The only
>> metric I can find is the last restore time, but neither the time it took to
>> read the checkpoints, nor the time it took to restore the operator/task
>> states seem to be covered. I am using RocksDB, but couldn't find any
>> metrics relating to how much time it took to restore the state backend from
>> rocksdb either.
>>
>> 3. I am trying to find documentation on how the states are serialized
>> into the checkpoint files from multiple operators and tasks to tailor the
>> testing use case, but can't seem to find any. Are there any bogs that go
>> into this detail or would reading the code be the only option?
>>
>> --
>> Thanks,
>> Deepthi
>>
>

-- 
Regards,
Deepthi


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-03-31 Thread Lu Niu
Hi, Colletta

Thanks for sharing! Do you mind share one stacktrace for that error as
well? Thanks!

Best
Lu

On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward 
wrote:

>
>
> FYI, we experience a similar error again, lost leadership but not due to
> timeout but a disconnect from zookeeper.  This time I examined logs for
> other errors related to zookeeper and found the kafka cluster that uses the
> same zookeeper also was disconnected.
>
>
>
> We run on AWS and this seems to be AWS related.
>
>
>
>
>
> *From:* Xintong Song 
> *Sent:* Sunday, January 31, 2021 9:23 PM
> *To:* user 
> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
> "ResourceManager leader changed to new address null"
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
> Hi Colletta,
>
>
>
> This error is kind of expected if the JobMaster / ResourceManager does not
> maintain a stable connection to the ZooKeeper service, which may be caused
> by network issues, GC pause, or unstable ZK service availability.
>
>
>
> By "similar issue", what I meant is I'm not aware of any issue related to
> the upgrading of the ZK version that may cause the leadership loss.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward 
> wrote:
>
> “but I'm not aware of any similar issue reported since the upgrading”
>
> For the record, we experienced this same error on Flink 1.11.2 this past
> week.
>
>
>
> *From:* Xintong Song 
> *Sent:* Friday, January 29, 2021 7:34 PM
> *To:* user 
> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
> "ResourceManager leader changed to new address null"
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
> wrote:
>
> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
>
> I would suggest the following:
>
> - Turn on the DEBUG log see if there's any valuable details
>
> - Maybe try asking in the Apache Zookeeper community, see if this is a
> known issue.
>
>
> Thank you~
> Xintong Song
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:
>
> Hi, Xintong
>
>
>
> Thanks for replying. Could it relate to the zk version? We are a platform
> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
> in 1.11 jobs. That's why we think it is related to version upgrade.
>
>
>
> Best
>
> Lu
>
>
>
> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
> wrote:
>
> The ZK client side uses 15s connection timeout and 60s session timeout
> in Flink. There's nothing similar to a heartbeat interval configured, which
> I assume is up to ZK's internal implementation. These things have not
> changed in FLink since at least 2017.
>
>
>
> If both ZK client and server complain about timeout, and there's no gc
> issue spotted, I would consider a network instability.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>
> After checking the log I found the root cause is zk client timeout on TM:
>
> ```
>
> 2021-01-25 14:01:49,600 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f
> 2021-01-25 14:01:49,610 INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
> 2021-01-25 14:01:49,711 INFO
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
> - State change: SUSPENDED
> 2021-01-25 14:01:49,711 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
> 27ac39342913d29baac4cde13062c4a4 with leader id
> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
> 2021-01-25 14:01:49,712 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
> connection for job 27ac39342913d29baac4cde13062c4a4.
> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Sink:
> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7).
> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
> 

ARM support

2021-03-31 Thread Rex Fenley
Hello,

We would like to run Flink on ARM yet haven't found any resources
indicating that this is yet possible. We are wondering what the timeline is
for Flink supporting ARM. Given that all Mac Books are moving to ARM and
that AWS is excitedly supporting ARM, it seems important that Flink also
supports running on ARM.

Thank you

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Robert Cullen
I’m using a local instance of MINIO on my kubernetes cluster for
checkpoint/savepoint storage. I’m using this StreamingFileSync
configuration:


final StreamingFileSink> sink =
StreamingFileSink.forRowFormat(
new Path("s3://argo-artifacts/"),
new SimpleStringEncoder>("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();

Anyone know how to fix this exception?

java.lang.UnsupportedOperationException: This s3 file system
implementation does not support recoverable writers.
at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
~[?:?]
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]

-- 
Robert Cullen
240-475-4490


Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Kevin Lam
Hi all,

We're interested in doing some analysis on how the size of our savepoints
and state affects the time it takes to restore from a savepoint. We're
running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.

What is the best way to measure the size of a Flink Application's state? Is
state.backend.rocksdb.metrics.total-sst-files-size

the right thing to look at?

We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
all our operators, after restoring from a savepoint, and we noticed that
the sum of all the sst files sizes is much much smaller than the total size
of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?

Do you have any general advice on correlating savepoint size with restore
times?

Thanks in advance!


PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Sumeet Malhotra
Cross posting from StackOverlow here:

https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array

Any pointers are appreciated!

Thanks,
Sumeet


Re: clear() in a ProcessWindowFunction

2021-03-31 Thread Vishal Santoshi
I had a query Say I have a single key with 2 live sessions ( A and B )
with a configured lateness .

Do these invariants hold?

* The state is scoped to the key (created per key in the
ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed
or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the
2 Sessions they happen sequentially ( or in any order but without the need
of synchronization )
* The state mutated by an event in Session A, will be visible to Session B
if an event incident on Session B was to happen subsequently.  There is no
need of synchronizing access to the state as it for the same key.

What I am not sure about is what happens when session A merge with session
B. I would assume that it just is defining new start and end of the merged
window, Gcing the old ones ( or at least one of them ) and assigning that
even to that new window. What one does with the custom state in
ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
done in the process method above,  As in this state is 1 degree removed
from what ever flink does internally with it's merges given that the state
is scoped to the key.







On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi 
wrote:

> Yep, makes sense.
>
> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan 
> wrote:
>
>> > Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> Window state is cleared (as well as the window itself), but global
>> state is not (unless you use TTL).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>  wrote:
>> >
>> > Sometimes writing it down makes you think. I now realize that this is
>> not the right approach, given that merging windows will have their own
>> states..and how the merge happens is really at the key level
>> >
>> >
>> >
>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> I intend to augment every event in a session  with a unique ID.  To
>> keep the session lean, there is a PurgingTrigger on this aggregate that
>> fires on a count of 1.
>> >>
>> >> >> (except that the number of keys can grow).
>> >>
>> >> Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> >>
>> >>
>> >>
>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
>> wrote:
>> >>>
>> >>> Hi Vishal,
>> >>>
>> >>> There is no leak in the code you provided (except that the number of
>> >>> keys can grow).
>> >>> But as you figured out the state is scoped to key, not to window+key.
>> >>>
>> >>> Could you explain what you are trying to achieve and why do you need
>> to combine
>> >>> sliding windows with state scoped to window+key?
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> >>>  wrote:
>> >>> >
>> >>> > Essentially, Does this code leak state
>> >>> >
>> >>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable>
>> >>> > extends
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow> {
>> >>> > private static final long serialVersionUID = 1L;
>> >>> > private final static ValueStateDescriptor sessionId = new
>> ValueStateDescriptor("session_uid",
>> >>> > String.class);
>> >>> >
>> >>> > @Override
>> >>> > public void process(KEY key,
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>> >>> > Iterable> elements,
>> Collector> out)
>> >>> > throws Exception {
>> >>> > // I need this scoped to key/window
>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> >>> > UUID uuid = UUID.randomUUID();
>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> >>> > }
>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> >>> > out.collect(new
>> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>> >>> > }
>> >>> > }
>> >>> >
>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>> >>
>> >>> >> Hello folks,
>> >>> >>   The suggestion is to use windowState() for a key
>> key per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>> >>
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
>> state is not allowed when using merging wi

Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Claude M
Thanks for your reply.  I'm using the flink docker
image flink:1.12.2-scala_2.11-java8.  Yes, the folder was created in S3.  I
took a look at the UI and it showed the following:

*Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type:
CheckpointPath:
s3:fcc82deebb4565f31a7f63989939c463/chk-49*

However, this is different from the savepoint path I specified.  I
specified the following:

*s3:savepoint2/savepoint-9fe457-504c312ffabe*

Is there anything specific you're looking for in the logs?  I did not find
any exceptions and there is a lot of sensitive information I would have to
extract from it.

Also, this morning, I tried creating another savepoint.  It first showed it
was In Progress.

curl 
http://localhost:8081/jobs/fcc82deebb4565f31a7f63989939c463/savepoints/4d19307dd99337257c4738871b1c63d8
{"status":{"id":"IN_PROGRESS"},"operation":null}

Then later when I tried to check the status, I saw the attached exception.

In the UI, I see the following:

*Latest Failed Checkpoint ID: 50Failure Time: 2021-03-31 09:34:43Cause:
Asynchronous task checkpoint failed.*

What does this failure mean?


On Wed, Mar 31, 2021 at 9:22 AM Matthias Pohl 
wrote:

> Hi Claude,
> thanks for reaching out to the Flink community. Could you provide the
> Flink logs for this run to get a better understanding of what's going on?
> Additionally, what exact Flink 1.12 version are you using? Did you also
> verify that the snapshot was created by checking the actual folder?
>
> Best,
> Matthias
>
> On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:
>
>> Hello,
>>
>> I have Flink setup as an Application Cluster in Kubernetes, using Flink
>> version 1.12.  I created a savepoint using the curl command and the status
>> indicated it was completed.  I then tried to relaunch the job from that
>> save point using the following arguments as indicated in the doc found
>> here:
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>>
>> args: ["standalone-job", "--job-classname", "", "--job-id",
>> "", "--fromSavepoint", "s3:///",
>> "--allowNonRestoredState"]
>>
>> After the job launches, I check the offsets and they are not the same as
>> when the savepoint was created.  The job id passed in also does not match
>> the job id that was launched.  I even put an incorrect savepoint path to
>> see what happens and there were no errors in the logs and the job still
>> launches.  It seems these arguments are not even being evaluated.  Any
>> ideas about this?
>>
>>
>> Thanks
>>
>
{"errors":["org.apache.flink.runtime.rest.NotFoundException: Operation not 
found under key: 
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@4b261c41\n\tat
 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest
(AbstractAsynchronousOperationHandlers.java:182)\n\tat 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest
(SavepointHandlers.java:219)\n\tat 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest
(AbstractRestHandler.java:83)\n\tat 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader
(AbstractHandler.java:195)\n\tat 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0
(LeaderRetrievalHandler.java:83)\n\tat 
java.util.Optional.ifPresent(Optional.java:159)\n\tat 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead
(SimpleChannelInboundHandler.java:99)\n\tat 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractCha

Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
Hi Deepthi,
1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
might be helpful.
2. Unfortunately, Flink doesn't provide metrics like that. But you might
want to follow FLINK-21736 [2] for future developments.
3. Is there anything specific you are looking for? Unfortunately, I don't
know any blogs for a more detailed summary. If you plan to look into the
code CheckpointCoordinator [3] might be a starting point. Alternatively,
something like MetadataV2V3SerializerBase [4] offers insights into how the
checkpoints' metadata is serialized.

Best,
Matthias

[1] https://github.com/apache/flink-benchmarks
[2] https://issues.apache.org/jira/browse/FLINK-21736
[3]
https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
[4]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83

On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
deepthi.sridha...@gmail.com> wrote:

> Hi,
>
> I am trying to set up some benchmarking with a couple of IO options for
> saving checkpoints and have a couple of questions :
>
> 1. Does flink come with any IO benchmarking tools? I couldn't find any. I
> was hoping to use those to derive some insights about the storage
> performance and extrapolate it for the checkpoint use case.
>
> 2. Are there any metrics pertaining to restore from checkpoints? The only
> metric I can find is the last restore time, but neither the time it took to
> read the checkpoints, nor the time it took to restore the operator/task
> states seem to be covered. I am using RocksDB, but couldn't find any
> metrics relating to how much time it took to restore the state backend from
> rocksdb either.
>
> 3. I am trying to find documentation on how the states are serialized into
> the checkpoint files from multiple operators and tasks to tailor the
> testing use case, but can't seem to find any. Are there any bogs that go
> into this detail or would reading the code be the only option?
>
> --
> Thanks,
> Deepthi
>


Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Matthias Pohl
Hi Claude,
thanks for reaching out to the Flink community. Could you provide the Flink
logs for this run to get a better understanding of what's going on?
Additionally, what exact Flink 1.12 version are you using? Did you also
verify that the snapshot was created by checking the actual folder?

Best,
Matthias

On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:

> Hello,
>
> I have Flink setup as an Application Cluster in Kubernetes, using Flink
> version 1.12.  I created a savepoint using the curl command and the status
> indicated it was completed.  I then tried to relaunch the job from that
> save point using the following arguments as indicated in the doc found
> here:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>
> args: ["standalone-job", "--job-classname", "", "--job-id",
> "", "--fromSavepoint", "s3:///",
> "--allowNonRestoredState"]
>
> After the job launches, I check the offsets and they are not the same as
> when the savepoint was created.  The job id passed in also does not match
> the job id that was launched.  I even put an incorrect savepoint path to
> see what happens and there were no errors in the logs and the job still
> launches.  It seems these arguments are not even being evaluated.  Any
> ideas about this?
>
>
> Thanks
>


Re: DataStream from kafka topic

2021-03-31 Thread Arian Rohani
The issue at hand is that the record contains an unmodifiable collection
which the kryo serialiser attempts to modify by first initialising the
object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)


Without knowing the specifics of what it is exactly you are trying to
deserialise I can only attempt to give a generic answer which is to try
something like:


> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Class unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> see.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);


An even better approach is to set-up a local sandbox environment in docker
with Kafka and a sink of your choice and simply running the application
form the main method in debug mode and setting a breakpoint right before it
throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl :

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> org.apache.flink.streaming.r

Re: Proper way to get DataStream

2021-03-31 Thread Matthias Pohl
Hi Maminspapin again,
have you checked whether your topic actually contains data that matches
your schema specified through cep.model.User?

Best,
Matthias

On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:

> Hi,
>
> I'm trying to solve a task with getting data from topic. This topic keeps
> avro format data.
>
> I wrote next code:
>
>  public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Schema schema = ReflectData.get().getSchema(User.class);
> FlinkKafkaConsumer userConsumer = new
> FlinkKafkaConsumer<>(
>"test_topic",
> *// First*
> AvroDeserializationSchema.forGeneric(schema),
> *// Second*
> //
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";),
> getConsumerProperties());
>
> DataStream userStream =
> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
> userStream.print("users");
>
> env.execute();
> }
>
> So, as I think right, there are two ways to get the result:
> 1. AvroDeserializationSchema.forGeneric(schema)
> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";)
>
> And I use ReflectData.get().getSchema(User.class) to get schema.
>
>
> Please, Flink guru, tell me if I am on the right way or not.
>
>
> If I use First way, there is next error:
>
> java.io.EOFException
> at org.apache.avro.io
> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
> at org.apache.avro.io
> .BinaryDecoder.readInt(BinaryDecoder.java:150)
> at org.apache.avro.io
> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>
> If I use Second way, there is next error:
>
> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
> expecting cep.model.User, missing required field userId
> at org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>
> How can I get the correct result?
>
> Sorry, if duplicated:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>
> Today is third day I'm working with this issue (((
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Ok, it looks like you've found that solution already based on your question
in [1].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl 
wrote:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>> at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecor

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs
about the DeserializationSchema [1]? It
mentions ConfluentRegistryAvroDeserializationSchema. Is this something
you're looking for?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema

On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:

> I tried this:
>
> 1. Schema (found in stackoverflow)
>
> class GenericRecordSchema implements
> KafkaDeserializationSchema {
>
> private String registryUrl;
> private transient KafkaAvroDeserializer deserializer;
>
> public GenericRecordSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public GenericRecord deserialize(ConsumerRecord
> consumerRecord) throws Exception {
> checkInitialized();
> return (GenericRecord)
> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (deserializer == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> deserializer = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
> 2. Consumer
>
> private static FlinkKafkaConsumer getConsumer(String topic)
> {
>
> return new FlinkKafkaConsumer<>(
> topic,
> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
> getConsumerProperties());
> }
>
> But when I start the app, the following error is happen:
>
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> 

Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Yik San Chan
Thank you, Till!

Actually I find I can access this via `Table.getSchema.getFieldNames` in
version 1.12.0

Best,
Yik San

On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann  wrote:

> You are right Yik San. This feature has only been introduced in the
> upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
> you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
> should happen in a couple of weeks if you really need this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19981
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
> wrote:
>
>> Hi Till,
>>
>> From the version I am using (1.12.0), getFieldNames is not available in
>> Row ... See
>> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
>> .
>>
>> Is there any workaround for this in version 1.12.0? Thanks.
>>
>> Best,
>> Yik San
>>
>> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
>> wrote:
>>
>>> There is a method Row.getFieldNames.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>>> wrote:
>>>
 Hi Till,

 I look inside the Row class, it does contain a member `private final
 Object[] fields;` though I wonder how to get column names out of the
 member?

 Thanks!

 Best,
 Yik San

 On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
 wrote:

> Hi Yik San,
>
> by converting the rows to a Tuple3 you effectively lose the
> information about the column names. You could also call
> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you
> keep the column names.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <
> evan.chanyik...@gmail.com> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>> .
>>
>> I want to consume a Kafka topic into a table using Flink SQL, then
>> convert it back to a DataStream.
>>
>> Here is the `SOURCE_DDL`:
>>
>> ```
>> CREATE TABLE kafka_source (
>> user_id BIGINT,
>> datetime TIMESTAMP(3),
>> last_5_clicks STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'aiinfra.fct.userfeature.0',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'test-group',
>> 'format' = 'json'
>> )
>> ```
>>
>> With Flink, I execute the DDL.
>>
>> ```scala
>> val settings = EnvironmentSettings.newInstance.build
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>> tableEnv.executeSql(SOURCE_DDL)
>> val table = tableEnv.from("kafka_source")
>> ```
>>
>> Then, I convert it into DataStream, and do downstream logic in the
>> `map(e => ...)` part.
>>
>> ```scala
>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>> String)](table).map(e => ...)
>> ```
>>
>> Inside the `map(e => ...)` part, I would like to access the column
>> name, in this case, `last_5_clicks`. Why? Because I may have different
>> sources with different columns names (such as `last_10min_page_view`), 
>> but
>> I would like to reuse the code in `map(e => ...)`.
>>
>> Is there a way to do this? Thanks.
>>
>> Best,
>> Yik San
>>
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Guowei Ma
Hi, community:

Friendly reminder that today (3.31) is the last day of feature development.
Under normal circumstances, you will not be able to submit new features
from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for testing,
welcome to help test together.
After the test is relatively stable, we will cut the release-1.13 branch.

Best,
Dawid & Guowei


On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann  wrote:

> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz  >
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that merge option
> > useful,
> > > > especially for small simple changes and for backports. The following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed would be the
> time
> > > for
> > > >> a Feature Freeze. From the knowledge I've gather so far it still
> seems
> > > to
> > > >> be a viable plan. I think it is a good time to agree on a particular
> > > date,
> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
> > > >> (Wednesday next week) as the feature freeze time.
> > > >>
> > > >> Similarly as last time, we want to create RC0 on the day after the
> > > feature
> > > >> freeze, to make sure the RC creation process is running smoothly,
> and
> > to
> > > >> have a common testing reference point.
> > > >>
> > > >> Having said that let us remind after Robert & Dian from the previous
> > > >> release what it a Feature Freeze means:
> > > >>
> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
> > > >> features are allowed to be merged to master. Only bug fixes and
> > > >> documentation improvements.
> > > >> The release managers will revert new feature commits after the
> feature
> > > >> freeze.
> > > >> Rational: The goal of the feature freeze phase is to improve the
> > system
> > > >> stability by addressing known bugs. New features tend to introduce
> new
> > > >> instabilities, which would prolong the release process.
> > > >> If you need to merge a new feature after the freeze, please open a
> > > >> discussion on the dev@ list. If there are no objections by a PMC
> > member
> > > >> within 48 (workday)hours, the feature can be merged.
> > > >>
> > > >> 2. *Merge PRs from the command line*
> > > >>
> > > >> In the past releases it was quite frequent around the Feature Freeze
> > > date
> > > >> that we ended up with a broken main branch that either did not
> compile
> > > or
> > > >> there were failing tests. It was often due to concurrent merges to
> the
> > > main
> > > >> branch via the "Rebase and merge" button. To overcome the problem we
> > > would
> > > >> like to suggest only ever merging PRs from a command line. Thank you
> > > >> Stephan for the idea! The suggested workflow would look as follows:
> > > >>
> > > >>1. Pull the change and rebase on the current main branch
> > > >>2. Build the project (e.g. from IDE, which should be faster than
> > > >>building entire project from cmd) -> this should ensure the
> project
> > > compiles
> > > >>3. Run the tests in the module that the change affects -> this
> > should
> > > >>greatly minimize the chances of failling tests
> > > >>4. Push the change to the main branch
> > > >>
> > > >> Let us know what you think!
> > > >>
> > > >> Best,
> > > >>
> > > >> Guowei & Dawid
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
>


Re:Re: How does Flink SQL read Avro union?

2021-03-31 Thread Vincent Dong
Hi Arvid,

I cannot decide the schema of the Kafka source topic since others also consume 
this topic.
I use Flink DataStream to consume the topic and then transform it to schema 
without union field in it, to avoid the Flink SQL issue.

Cheers,
Vincent



At 2021-03-22 22:04:53, "Arvid Heise"  wrote:

Hi Vincent,


I'm not well into Flink SQL, so I'm pulling in Jark.


I have stopped using union records in your way and instead only use nullable 
fields (technically also a union field but much easier to handle in all 
languages).


So if you have a way to change the schema, maybe try it out:

  record RowEvent {
union { null, ItemRow } item_row default null;
union { null, RefundRow } refund_row default null;
  }





On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong  wrote:

Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
string num_iid;
string has_showcase;
string jdp_created;
  }


  record RefundRow {
string refund_id;
string status;
string jdp_created;
  }


  record RowEvent {
union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is 
RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? 
Then I can `select status, refund_id from the_converted_table`.




Thanks
Vincent Dong




 

Re: How to specific key serializer

2021-03-31 Thread Tzu-Li (Gordon) Tai
Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide
the serializer to be used. Since that recognizes Scala case classes, the
CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into
play, and therefore it directly goes to Flink's type extraction for Java
classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints
written by Scala DataStream jobs that well.

You can try using TypeInfo annotations to specify a TypeInformationFactory
for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a
given class. In that custom TypeInformation, you should let it return the
correct serializer.

Best,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> Currently we use sbt-avrohugger [0] to generate key class for keyed
> state.  The key class generated by sbt-avrohugger is both case class,
> and AVRO specific record. However, in the following scenarons, Flink
> uses different serializers:
>
>
> * In streaming application, Flink uses CaseClassSerializer for key
>   class.
> * In state processor API application, Flink uses AvroSerializer for key
>   class.
>
>
> Since they use different serializers for key, they are not compatible.
> Is there any way to specific key serializer so that both applications
> use the same serializer?
>
>
>
> [0] https://github.com/julianpeeters/sbt-avrohugger
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Fw:A question about flink watermark illustration in official documents

2021-03-31 Thread Matthias Pohl
Hi 罗昊,
the 2nd picture is meant to visualize the issue of out-of-orderness in
general. I'd say it's not referring to a specific strategy.

But one way to interpret the image is using the BoundedOutOfOrderness
strategy for watermark generation [1]: You can define an upper bound B for
the out-of-orderness. The watermark generator assumes that there's a delay
of B, i.e. for an event with timestamp T, no events older than {@code T -
B} will follow any more. The delayed watermarks you see in image 2 could be
achieved using this bounded out-of-orderness strategy.

The usage of watermark strategies is also addressed in the docs [2].

I hope this helps.
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java#L37
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#generating-watermarks

On Tue, Mar 30, 2021 at 6:26 AM 罗昊  wrote:

> Recently I read flink official documents for something about watermarks。
> url:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
> there are two pictures illustrating flink watermark mechanism, which
> puzzle me mush:
>
>
> The first picture is easy to understand, But in the second, I wonder how
> do we get w(11) and w(17)?
> As we know, we can define how to generate watermark in the flink job, in
> other words, watermarks are generated by certain rules. So what are the
> rules that the watermarks are generated in the second pic.
>
> I look up for almost  all offficial documents of different version flink
> and they use the same pictures.
> It puzzled me much。Is there any explaination?
> waiting for your answers ,Thx!
>


Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Till Rohrmann
You are right Yik San. This feature has only been introduced in the
upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
should happen in a couple of weeks if you really need this feature.

[1] https://issues.apache.org/jira/browse/FLINK-19981

Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
wrote:

> Hi Till,
>
> From the version I am using (1.12.0), getFieldNames is not available in
> Row ... See
> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
> .
>
> Is there any workaround for this in version 1.12.0? Thanks.
>
> Best,
> Yik San
>
> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
> wrote:
>
>> There is a method Row.getFieldNames.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I look inside the Row class, it does contain a member `private final
>>> Object[] fields;` though I wonder how to get column names out of the
>>> member?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Yik San,

 by converting the rows to a Tuple3 you effectively lose the information
 about the column names. You could also call `toRetractStream[Row]` which
 will give you a `DataStream[Row]` where you keep the column names.

 Cheers,
 Till

 On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
 wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
> .
>
> I want to consume a Kafka topic into a table using Flink SQL, then
> convert it back to a DataStream.
>
> Here is the `SOURCE_DDL`:
>
> ```
> CREATE TABLE kafka_source (
> user_id BIGINT,
> datetime TIMESTAMP(3),
> last_5_clicks STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'aiinfra.fct.userfeature.0',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'test-group',
> 'format' = 'json'
> )
> ```
>
> With Flink, I execute the DDL.
>
> ```scala
> val settings = EnvironmentSettings.newInstance.build
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
> tableEnv.executeSql(SOURCE_DDL)
> val table = tableEnv.from("kafka_source")
> ```
>
> Then, I convert it into DataStream, and do downstream logic in the
> `map(e => ...)` part.
>
> ```scala
> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
> String)](table).map(e => ...)
> ```
>
> Inside the `map(e => ...)` part, I would like to access the column
> name, in this case, `last_5_clicks`. Why? Because I may have different
> sources with different columns names (such as `last_10min_page_view`), but
> I would like to reuse the code in `map(e => ...)`.
>
> Is there a way to do this? Thanks.
>
> Best,
> Yik San
>



Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-31 Thread Sumeet Malhotra
Thanks Dawid. This looks like what I needed :-)

On Tue, Mar 30, 2021 at 12:28 PM Dawid Wysakowicz 
wrote:

> Hey,
>
> I am not sure which format you use, but if you work with JSON maybe this
> option[1] could help you.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> On 30/03/2021 06:45, Sumeet Malhotra wrote:
>
> Thanks. Yes, that's a possibility. I'd still prefer something that can be
> done within the Table API. If it's not possible, then there's no other
> option but to use the DataStream API to read from Kafka, do the time
> conversion and create a table from it.
>
> ..Sumeet
>
> On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I hope someone else might have a better answer, but one thing that would
>> most likely work is to convert this field and define even time during
>> DataStream to table conversion [1]. You could always pre process this field
>> in the DataStream API.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>>
>> pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> Might be a simple, stupid question, but I'm not able to find how to
>>> convert/interpret a UTC datetime string like
>>> *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm
>>> ingesting data from Kafka and can read this field as a string, but would
>>> like to mark it as event-time by defining a watermark.
>>>
>>> I'm able to achieve this using the DataStream API, by defining my own
>>> TimestampAssigner that converts the datetime string to milliseconds since
>>> epoch. How can I do this using a SQL DDL or Table API?
>>>
>>> I tried to directly interpret the string as TIMESTAMP(3) but it fails
>>> with the following exception:
>>>
>>> java.time.format.DateTimeParseException: Text
>>> '2021-03-23T07:37:00.613910Z' could not be parsed...
>>>
>>> Any pointers?
>>>
>>> Thanks!
>>> Sumeet
>>>
>>>