Re: [ANNOUNCE] Release 1.13.0, release candidate #0

2021-04-01 Thread Xingbo Huang
Hi Dawid,

Thanks a lot for the great work! Regarding to the issue of flink-python, I
have provided a quick fix and will try to fix it ASAP.

Best,
Xingbo

Dawid Wysakowicz  于2021年4月2日周五 上午4:04写道:

> Hi everyone,
> As promised I created a release candidate #0 for the version 1.13.0. I am
> not starting a vote for this release as I've created it mainly for
> verifying the release process. We are still aware of some improvements
> coming in shortly. However we will greatly appreciate any help testing this
> RC already. It can help tremendously identifying any problems early.
>
> Unfortunately I was not able to create binary convenience release for
> flink-python, because of a bug in the release scripts which can be tracked
> in https://issues.apache.org/jira/browse/FLINK-22095
>
> The complete staging area is available for your review, which includes:
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [1], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [2],
> * all artifacts to be deployed to the Maven Central Repository [3],
> * source code tag "release-1.2.3-rc3" [4],
>
> Your help testing the release will be greatly appreciated!
>
> Thanks,
> Dawid Wysakowicz
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc0/
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1417/
> [4] https://github.com/apache/flink/tree/release-1.13.0-rc0
>


Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-01 Thread Yik San Chan
The question is cross-posted in StackOverflow
https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce

According to [Flink SQL Hive: Using bundled hive jar](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
):

> The following tables list all available bundled hive jars. You can pick
one to the /lib/ directory in Flink distribution.
> - flink-sql-connector-hive-1.2.2 (download link)
> - flink-sql-connector-hive-2.2.0 (download link)
> ...

However, these dependencies are not available from Maven central. As a work
around, I use [user defined dependencies](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
but this is not recommended:

> the recommended way to add dependency is to use a bundled jar. Separate
jars should be used only if bundled jars don’t meet your needs.

I wonder why the bundle jars are not available in Maven central?

Follow-up: Since they are not available from Maven central, I wonder how to
include them in pom.xml in order to run `mvn package`?

Thanks!


Re: Avro schema

2021-04-01 Thread Sumeet Malhotra
Just realized, my question was probably not clear enough. :-)

I understand that the Avro (or JSON for that matter) format can be ingested
as described here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
but this still requires the entire table specification to be written in the
"CREATE TABLE" section. Is it possible to just specify the Avro schema and
let Flink map it to an SQL table?

BTW, the above link is titled "Table API Legacy Connectors", so is this
still supported? Same question for YAML specification.

Thanks,
Sumeet

On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
wrote:

> Hi,
>
> Is it possible to directly import Avro schema while ingesting data into
> Flink? Or do we always have to specify the entire schema in either SQL DDL
> for Table API or using DataStream data types? From a code maintenance
> standpoint, it would be really helpful to keep one source of truth for the
> schema somewhere.
>
> Thanks,
> Sumeet
>


Avro schema

2021-04-01 Thread Sumeet Malhotra
Hi,

Is it possible to directly import Avro schema while ingesting data into
Flink? Or do we always have to specify the entire schema in either SQL DDL
for Table API or using DataStream data types? From a code maintenance
standpoint, it would be really helpful to keep one source of truth for the
schema somewhere.

Thanks,
Sumeet


Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Guowei Ma
Hi, Yuval

Thanks for your contribution. I am not a SQL expert, but it seems to be
beneficial to users, and the amount of code is not much and only left is
the test. Therefore, I am open to this entry into rc1.
But according to the rules, you still have to see if there are other PMC's
objections within 48 hours.

Best,
Guowei


On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov  wrote:

> Hi All,
>
> I would really love to merge https://github.com/apache/flink/pull/15307
> prior to 1.13 release cutoff, it just needs some more tests which I can
> hopefully get to today / tomorrow morning.
>
> This is a critical fix as now predicate pushdown won't work for any stream
> which generates a watermark and wants to push down predicates.
>
> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>
>> Thanks Dawid, I have merged FLINK-20320.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi all,
>>>
>>> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
>>> much finished. We can wait for those two before creating the RC0.
>>>
>>> @Leonard Personally I'd be ok with 3 more days for that single PR. I
>>> find the request reasonable and I second that it's better to have a proper
>>> review rather than rush unfinished feature and try to fix it later.
>>> Moreover it got broader support. Unless somebody else objects, I think we
>>> can merge this PR later and include it in RC1.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 01/04/2021 08:39, Arvid Heise wrote:
>>>
>>> Hi Dawid and Guowei,
>>>
>>> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
>>> pretty much just waiting for AZP to turn green, it's separate from other
>>> components, and it's a super useful feature for Flink users.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> [1] https://github.com/apache/flink/pull/15054
>>>
>>> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>>>
 Hi Guowei and Dawid,

 I want to request the permission to merge this feature [1], it's a
 useful improvement to sql client and won't affect
 other components too much. We were plan to merge it yesterday but met
 some tricky multi-process issue which
 has a very high possibility hanging the tests. It took us a while to
 find out the root cause and fix it.

 Since it's not too far away from feature freeze and RC0 also not
 created yet, thus I would like to include this
 in 1.13.

 [1] https://issues.apache.org/jira/browse/FLINK-20320

 Best,
 Kurt


 On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:

> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able to submit 
> new
> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
> testing, welcome to help test together.
> After the test is relatively stable, we will cut the release-1.13
> branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
> wrote:
>
>> +1 for the 31st of March for the feature freeze.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>> wrote:
>>
>> > +1 for March 31st for the feature freeze.
>> >
>> >
>> >
>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > wrote:
>> >
>> > > Thank you Thomas! I'll definitely check the issue you linked.
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>> > > > Hi Dawid,
>> > > >
>> > > > Thanks for the heads up.
>> > > >
>> > > > Regarding the "Rebase and merge" button. I find that merge
>> option
>> > useful,
>> > > > especially for small simple changes and for backports. The
>> following
>> > > should
>> > > > help to safeguard from the issue encountered previously:
>> > > > https://github.com/jazzband/pip-tools/issues/1085
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Hi devs, users!
>> > > >>
>> > > >> 1. *Feature freeze date*
>> > > >>
>> > > >> We are approaching the end of March which we agreed would be
>> the time
>> > > for
>> > > >> a Feature Freeze. From the knowledge I've gather so far it
>> still seems
>> > > to
>> > > >> be a viable plan. I think it is a good time to agree on a
>> particular
>> > > date,
>> > > >> when it should happen. We suggest *(end of day CEST) March
>> 31st*
>> > > >> (Wednesday next week) as the feature freeze time.
>> > > >>
>> > > >> Similarly as last time, we want to create RC0 on the day after
>> the
>>>

How to know if task-local recovery kicked in for some nodes?

2021-04-01 Thread Sonam Mandal
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we kill one of the Task 
Manager pods (we run on Kubernetes). I want to see if the local state of the 
other Task Manager was used or not. I do understand that the state for the 
killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a 
better way?

Thanks,
Sonam


Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage
fails. On the other hand the FileSystem already has this retry mechanism
already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is
interrupted by some reason. So I think we could get more insight if we
could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi Guowei,
>
> I thought Flink can support any HDFS-compatible object store like the
> majority of Big Data frameworks. So we just added
> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
> dependencies to the classpath, after that using "gs" prefix seems to be
> possible:
>
> state.checkpoints.dir: gs:///flink-checkpoints
> state.savepoints.dir: gs:///flink-savepoints
>
> And yes, I noticed that retries logging too, but I'm not sure if it's
> implemented on the Flink side or the GCS connector side? Probably need to
> dive deeper into the source code. And if it's implemented on the GCS
> connector side, will Flink wait for all the retries? That's why I asked
> about the potential timeout on the Flink side.
>
> The JM log doesn't have much besides from what I already posted. It's hard
> for me to share the whole log, but the RocksDB initialization part can be
> relevant:
>
> 16:03:41.987 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
> configure application-defined state backend:
> RocksDBStateBackend{checkpointStreamBackend=File State Backend
> (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> predefined options: FLASH_SSD_OPTIMIZED.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
> state.backend.rocksdb.block.blocksize=16 kb,
> state.backend.rocksdb.block.cache-size=64 mb}}.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
>
> Thanks!
>
> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
>> is implemented by yourself?
>> Would you like to share the whole log of jm?
>>
>> BTW: From the following log I think the implementation has already some
>> retry mechanism.
>> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
>> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm wondering if people have experienced issues with Taskmanager failure
>>> recovery when dealing with a lot of state.
>>>
>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>>> and checkpoints. ~150 task managers with 4 slots each.
>>>
>>> When I run a pipeline without much state and kill one of the
>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>>> eventually when a new replacement taskmanager is registered with the
>>> jobmanager things go back to healthy.
>>>
>>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>>> taskmanagers, the pipeline never recovers, even after the replacement
>>> taskmanager has joined. It just enters an infinite loop of restarts and
>>> failures.
>>>
>>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>>> It stays in RUNNING for a few seconds, but then transitions into FAILED
>>> with a message like this:
>>>
>>>
>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> readAddress(..) failed: Connection reset by peer (connection to '
>>> 10.30.10.53/10.30.10.53:45789')
>>> at
>>> or

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-04-01 Thread Guowei Ma
Hi, Robert
It seems that your AccessKeyId is not valid.
I think you could find more detailed from [1] about how to configure the
s3' access key.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/
Best,
Guowei


On Thu, Apr 1, 2021 at 9:19 PM Robert Cullen  wrote:

> Guowei,
>
>  I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm
> using the example playground from here:
>
> [1] https://docs.ververica.com/getting_started/installation.html
>
> org.apache.flink.util.SerializedThrowable: 
> 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate MultiPartUpload 
> on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: 
> com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you 
> provided does not exist in our records. (Service: Amazon S3; Status Code: 
> 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 
> Extended Request ID: 
> VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; 
> Proxy: null), S3 Extended Request ID: 
> VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId
> at 
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) 
> ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) 
> ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?]
> at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>  ~[?:?]
> at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
>  ~[?:?]
> at 
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62)
>  ~[?:?]
> at 
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253)
>  ~[?:?]
> at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
>  ~[?:?]
> at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78)
>  ~[?:?]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
>  ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34)
>  ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRe

[ANNOUNCE] Release 1.13.0, release candidate #0

2021-04-01 Thread Dawid Wysakowicz
|Hi everyone,|
|As promised I created a release candidate #0 for the version 1.13.0. I
am not starting a vote for this release as I've created it mainly for
verifying the release process. We are still aware of some improvements
coming in shortly. However we will greatly appreciate any help testing
this RC already. It can help tremendously identifying any problems early.
|
|
|
|Unfortunately I was not able to create binary convenience release for
flink-python, because of a bug in the release scripts which can be
tracked in https://issues.apache.org/jira/browse/FLINK-22095
|
 
|The complete staging area is available for your review, which includes:|
|* the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [1], which are signed with the key with
fingerprint 31D2DD10BFC15A2D [2],|
|* all artifacts to be deployed to the Maven Central Repository [3],|
|* source code tag "release-1.2.3-rc3" [4],|
 
|Your help testing the release will be greatly appreciated!
|
 
|Thanks,|
|Dawid Wysakowicz
|
 
|[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc0/
|
|[2] https://dist.apache.org/repos/dist/release/flink/KEYS
|
|[3]
https://repository.apache.org/content/repositories/orgapacheflink-1417/|
|[4] https://github.com/apache/flink/tree/release-1.13.0-rc0|


OpenPGP_signature
Description: OpenPGP digital signature


Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-01 Thread Fuyao Li
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[2]  https://github.com/lyft/flinkk8soperator
[3] https://youtu.be/pdFPr_VOWTU

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年3月31日周三 
上午6:37写道:
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# 
https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services

Re: DataStream from kafka topic

2021-04-01 Thread Arian Rohani
Thank you Arvid, I was going to suggest something like this also.
We use TestContainers and the docker images provided by ververica to do
exactly this in our team.

I am currently working on a small project on github to start sharing for
use cases like this.
The project will contain some example sources and example sinks together
with a generic Flink application.
I will follow up sometime during the weekend with a poc. It's super
straightforward to set-up and use.

To elaborate a bit more on Arvids suggestion:

   - Use TestContainers as a base to configure your integration test.
   - LocalStack  is a fully
   functional docker container that you can use to mock various AWS services.
   Since it's unclear what sink you're using i just want to throw this out
   there.
   - Set up two containers abstracting the job manager and task manager
   according to this
   

   documentation. If you decide to go with the application cluster route then
   I suggest setting up the task manager and job manager as GenericContainers.
   The rationale is that if you do everything in docker-compose and use a
   DockerComposeContainer the application will start before you have a chance
   to mock the data in your source as the DockerComposeContainer is started
   immediately iirc (which may be problematic depending on the way you
   application is configured to read from Kafka).

In fact one of the major benefits is that you simply configure the source
and sink and run the application outside of docker (as a
LocalStreamEnvironment).
This enables you to set breakpoints where the application is throwing the
exception which is specially valuable in circumstances like this where the
stacktrace is not super descriptive.

Best,
Arian Rohani


Den tors 1 apr. 2021 kl 15:00 skrev Arvid Heise :

> Arian gave good pointers, but I'd go even further: you should have ITCases
> where you pretty much just execute a mini job with docker-based Kafka and
> run it automatically.
> I strongly recommend to check out testcontainers [1], it makes writing
> such a test a really smooth experience.
>
> [1] https://www.testcontainers.org/modules/kafka/
>
>
> On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani 
> wrote:
>
>> The issue at hand is that the record contains an unmodifiable collection
>> which the kryo serialiser attempts to modify by first initialising the
>> object and then adding items to the collection (iirc).
>>
>> Caused by: java.lang.UnsupportedOperationException
>>> at
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>
>>
>> Without knowing the specifics of what it is exactly you are trying to
>> deserialise I can only attempt to give a generic answer which is to try
>> something like:
>>
>>
>>> StreamExecutionEnvironment see =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> Class unmodColl =
>>> Class.forName("java.util.Collections$UnmodifiableCollection");
>>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>>> UnmodifiableCollectionsSerializer.class);
>>
>>
>> An even better approach is to set-up a local sandbox environment in
>> docker with Kafka and a sink of your choice and simply running the
>> application form the main method in debug mode and setting a breakpoint
>> right before it throws the exception.
>>
>> Kind regards,
>> Arian Rohani
>>
>>
>> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl > >:
>>
>>> Hi Maminspapin,
>>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>>> docs about the DeserializationSchema [1]? It
>>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>>> you're looking for?
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>>
>>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>>>
 I tried this:

 1. Schema (found in stackoverflow)

 class GenericRecordSchema implements
 KafkaDeserializationSchema {

 private String registryUrl;
 private transient KafkaAvroDeserializer deserializer;

 public GenericRecordSchema(String registryUrl) {
 this.registryUrl = registryUrl;
 }

 @Override
 public boolean isEndOfStream(GenericRecord nextElement) {
 return false;
 }

 @Override
 public GenericRecord deserialize(ConsumerRecord
 consumerRecord) throws Exception {
 checkInitialized();
 return (GenericRecord)
 deserializer.deserialize(consumerRecord.topic(),
 consumerRecord.value());
 }

 @Override
 public TypeInformation getProducedType() {
 return TypeExtractor.getForClass(GenericRecord.class);
 }

 private void checkI

Question about setting up Task-local recovery with a RocksDB state backend

2021-04-01 Thread Sonam Mandal
Hello,

I've been going through the documentation for task-local recovery and came 
across this 
section
 which discusses that with incremental checkpoints enabled the task-local 
recovery incurs no additional storage cost. The caveat mentioned indicates that 
the task local recovery state and all the rocks DB local state must be on a 
single physical device to allow the use of hard links. I wanted to understand 
how to ensure that our RocksDB local state is on the same physical device as 
the task-local recovery data.

I came across a couple of config options we can set to point the RocksDB local 
state to a directory of our choosing, along with the task local recovery 
directory. Do I need to set both up for task local recovery to work correctly? 
What are the default paths if I don't set up these configs? (we are using 
Kubernetes - assume that /opt/flink/local-state below corresponds to a given 
physical drive)


state.backend.rocksdb.localdir: /opt/flink/local-state/rocksdblocaldir

taskmanager.state.local.root-dirs: /opt/flink/local-state/tasklocaldir

Do these configs make any difference if we turn off incremental checkpointing 
for RocksDB? Also, setting up this localdir for RocksDB won't affect 
checkpointing and where the checkpoints are stored, right?

After setting up the above two configs, I ran into some issues where the job 
would just disappear (or fail) if the Task Manager pod got killed (whereas 
without this, the job resumed correctly from the last checkpoint after the task 
manager pod was killed).

Thanks,
Sonam



Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Yaroslav Tkachenko
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the
majority of Big Data frameworks. So we just added
"flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
dependencies to the classpath, after that using "gs" prefix seems to be
possible:

state.checkpoints.dir: gs:///flink-checkpoints
state.savepoints.dir: gs:///flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's
implemented on the Flink side or the GCS connector side? Probably need to
dive deeper into the source code. And if it's implemented on the GCS
connector side, will Flink wait for all the retries? That's why I asked
about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard
for me to share the whole log, but the RocksDB initialization part can be
relevant:

16:03:41.987 [cluster-io-thread-3] INFO
 org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
configure application-defined state backend:
RocksDBStateBackend{checkpointStreamBackend=File State Backend
(checkpoints: 'gs:///flink-checkpoints', savepoints:
'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
1048576), localRocksDbDirectories=[/rocksdb],
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
state.backend.rocksdb.block.blocksize=16 kb,
state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
state backend: RocksDBStateBackend{checkpointStreamBackend=File State
Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
1048576), localRocksDbDirectories=[/rocksdb],
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:

> Hi, Yaroslav
>
> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
> is implemented by yourself?
> Would you like to share the whole log of jm?
>
> BTW: From the following log I think the implementation has already some
> retry mechanism.
> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi everyone,
>>
>> I'm wondering if people have experienced issues with Taskmanager failure
>> recovery when dealing with a lot of state.
>>
>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>> and checkpoints. ~150 task managers with 4 slots each.
>>
>> When I run a pipeline without much state and kill one of the
>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>> eventually when a new replacement taskmanager is registered with the
>> jobmanager things go back to healthy.
>>
>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>> taskmanagers, the pipeline never recovers, even after the replacement
>> taskmanager has joined. It just enters an infinite loop of restarts and
>> failures.
>>
>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>> It stays in RUNNING for a few seconds, but then transitions into FAILED
>> with a message like this:
>>
>>
>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> readAddress(..) failed: Connection reset by peer (connection to '
>> 10.30.10.53/10.30.10.53:45789')
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> ...
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>> readAddress(..) failed: Connection reset by peer
>>
>>
>> Which, I guess, means a failed Taskmanager. And since there are not
>> enough task slots to run it goes into this endless loop again. It's never
>> the same Taskmanager that fails.
>>
>>
>>
>> On the Taskmanager side, things look more interesting. I see a variety of
>> exceptions:
>>
>>
>> o

Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Yuval Itzchakov
Hi All,

I would really love to merge https://github.com/apache/flink/pull/15307
prior to 1.13 release cutoff, it just needs some more tests which I can
hopefully get to today / tomorrow morning.

This is a critical fix as now predicate pushdown won't work for any stream
which generates a watermark and wants to push down predicates.

On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:

> Thanks Dawid, I have merged FLINK-20320.
>
> Best,
> Kurt
>
>
> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
> wrote:
>
>> Hi all,
>>
>> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
>> much finished. We can wait for those two before creating the RC0.
>>
>> @Leonard Personally I'd be ok with 3 more days for that single PR. I find
>> the request reasonable and I second that it's better to have a proper
>> review rather than rush unfinished feature and try to fix it later.
>> Moreover it got broader support. Unless somebody else objects, I think we
>> can merge this PR later and include it in RC1.
>>
>> Best,
>>
>> Dawid
>> On 01/04/2021 08:39, Arvid Heise wrote:
>>
>> Hi Dawid and Guowei,
>>
>> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
>> pretty much just waiting for AZP to turn green, it's separate from other
>> components, and it's a super useful feature for Flink users.
>>
>> Best,
>>
>> Arvid
>>
>> [1] https://github.com/apache/flink/pull/15054
>>
>> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>>
>>> Hi Guowei and Dawid,
>>>
>>> I want to request the permission to merge this feature [1], it's a
>>> useful improvement to sql client and won't affect
>>> other components too much. We were plan to merge it yesterday but met
>>> some tricky multi-process issue which
>>> has a very high possibility hanging the tests. It took us a while to
>>> find out the root cause and fix it.
>>>
>>> Since it's not too far away from feature freeze and RC0 also not created
>>> yet, thus I would like to include this
>>> in 1.13.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>>>
 Hi, community:

 Friendly reminder that today (3.31) is the last day of feature
 development. Under normal circumstances, you will not be able to submit new
 features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
 testing, welcome to help test together.
 After the test is relatively stable, we will cut the release-1.13
 branch.

 Best,
 Dawid & Guowei


 On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
 wrote:

> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
> dwysakow...@apache.org>
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that merge option
> > useful,
> > > > especially for small simple changes and for backports. The
> following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed would be
> the time
> > > for
> > > >> a Feature Freeze. From the knowledge I've gather so far it
> still seems
> > > to
> > > >> be a viable plan. I think it is a good time to agree on a
> particular
> > > date,
> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
> > > >> (Wednesday next week) as the feature freeze time.
> > > >>
> > > >> Similarly as last time, we want to create RC0 on the day after
> the
> > > feature
> > > >> freeze, to make sure the RC creation process is running
> smoothly, and
> > to
> > > >> have a common testing reference point.
> > > >>
> > > >> Having said that let us remind after Robert & Dian from the
> previous
> > > >> release what it a Feature Freeze means:
> > > >>
> > > >> *B) What does feature freeze mean?*After the feature freeze, no
> new
> > > >> features are allowed to be merged to master. Only bug fixes and
> > > >> documentation improvements.
> > > >> The release 

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-04-01 Thread Robert Cullen
Guowei,

 I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm
using the example playground from here:

[1] https://docs.ververica.com/getting_started/installation.html

org.apache.flink.util.SerializedThrowable:
4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate
MultiPartUpload on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key
Id you provided does not exist in our records. (Service: Amazon S3;
Status Code: 403; Error Code: InvalidAccessKeyId; Request ID:
RMD85E1G3WAK18VE; S3 Extended Request ID:
VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=;
Proxy: null), S3 Extended Request ID:
VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
~[?:?]
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?]
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) ~[?:?]
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
~[?:?]
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?]
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?]
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
~[?:?]
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
~[?:?]
at 
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62)
~[?:?]
at 
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253)
~[?:?]
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
~[?:?]
at 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78)
~[?:?]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34)
~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamO

Re: DataStream from kafka topic

2021-04-01 Thread Arvid Heise
Arian gave good pointers, but I'd go even further: you should have ITCases
where you pretty much just execute a mini job with docker-based Kafka and
run it automatically.
I strongly recommend to check out testcontainers [1], it makes writing such
a test a really smooth experience.

[1] https://www.testcontainers.org/modules/kafka/


On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani  wrote:

> The issue at hand is that the record contains an unmodifiable collection
> which the kryo serialiser attempts to modify by first initialising the
> object and then adding items to the collection (iirc).
>
> Caused by: java.lang.UnsupportedOperationException
>> at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>
>
> Without knowing the specifics of what it is exactly you are trying to
> deserialise I can only attempt to give a generic answer which is to try
> something like:
>
>
>> StreamExecutionEnvironment see =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> Class unmodColl =
>> Class.forName("java.util.Collections$UnmodifiableCollection");
>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>> UnmodifiableCollectionsSerializer.class);
>
>
> An even better approach is to set-up a local sandbox environment in docker
> with Kafka and a sink of your choice and simply running the application
> form the main method in debug mode and setting a breakpoint right before it
> throws the exception.
>
> Kind regards,
> Arian Rohani
>
>
> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl  >:
>
>> Hi Maminspapin,
>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>> docs about the DeserializationSchema [1]? It
>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>> you're looking for?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>
>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>>
>>> I tried this:
>>>
>>> 1. Schema (found in stackoverflow)
>>>
>>> class GenericRecordSchema implements
>>> KafkaDeserializationSchema {
>>>
>>> private String registryUrl;
>>> private transient KafkaAvroDeserializer deserializer;
>>>
>>> public GenericRecordSchema(String registryUrl) {
>>> this.registryUrl = registryUrl;
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(GenericRecord nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public GenericRecord deserialize(ConsumerRecord
>>> consumerRecord) throws Exception {
>>> checkInitialized();
>>> return (GenericRecord)
>>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>>> }
>>>
>>> @Override
>>> public TypeInformation getProducedType() {
>>> return TypeExtractor.getForClass(GenericRecord.class);
>>> }
>>>
>>> private void checkInitialized() {
>>> if (deserializer == null) {
>>> Map props = new HashMap<>();
>>>
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>>
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>> false);
>>> SchemaRegistryClient client =
>>> new CachedSchemaRegistryClient(
>>> registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>> deserializer = new KafkaAvroDeserializer(client, props);
>>> }
>>> }
>>> }
>>>
>>> 2. Consumer
>>>
>>> private static FlinkKafkaConsumer getConsumer(String
>>> topic) {
>>>
>>> return new FlinkKafkaConsumer<>(
>>> topic,
>>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>>> getConsumerProperties());
>>> }
>>>
>>> But when I start the app, the following error is happen:
>>>
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.UnsupportedOperationException
>>> Serialization trace:
>>> reserved (org.apache.avro.Schema$Field)
>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>> schema (org.apache.avro.generic.GenericData$Record)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>> at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>>

Re: Re: How does Flink SQL read Avro union?

2021-04-01 Thread Arvid Heise
Hi Vincent,

yes if you cannot influence the schema, then there is little you can do on
SQL level and your workaround is probably the only way to go.

However, I'd encourage you to speak with the other consumers/producers to
find a way without unions. They are also ugly to use in all strongly typed
languages (except Scala) and they are not forward and backward-compatible.
Chances are high that the other users are not Avro experts and would
benefit from these insights. I can guarantee you that it's just a matter of
time until unions will break things and annoy the hell out of you (probably
not in this topic, but if you do not establish best practices without
unions, it will happen with another topic).

On Wed, Mar 31, 2021 at 11:54 AM Vincent Dong  wrote:

> Hi Arvid,
>
> I cannot decide the schema of the Kafka source topic since others also
> consume this topic.
> I use Flink DataStream to consume the topic and then transform it to
> schema without union field in it, to avoid the Flink SQL issue.
>
> Cheers,
> Vincent
>
> At 2021-03-22 22:04:53, "Arvid Heise"  wrote:
>
> Hi Vincent,
>
> I'm not well into Flink SQL, so I'm pulling in Jark.
>
> I have stopped using union records in your way and instead only use
> nullable fields (technically also a union field but much easier to handle
> in all languages).
>
> So if you have a way to change the schema, maybe try it out:
>   record RowEvent {
> union { null, ItemRow } item_row default null;
> union { null, RefundRow } refund_row default null;
>   }
>
>
>
> On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong  wrote:
>
>> Hi All,
>>   How does Flink SQL read Kafka Avro message which has union field?
>>   For me,  avro schema is defined as following,
>> ```
>>   record ItemRow {
>> string num_iid;
>> string has_showcase;
>> string jdp_created;
>>   }
>>
>>   record RefundRow {
>> string refund_id;
>> string status;
>> string jdp_created;
>>   }
>>
>>   record RowEvent {
>> union { ItemRow, RefundRow } item_row;
>>   }
>> ```
>> Now I'm sure that for a specific kafka topic, the item_row in all
>> messages is RefundRow, but I don't know how to define source table and
>> query the table.
>> Can I define the table to force Flink SQL converts all messages to
>> RefundRow? Then I can `select status, refund_id from the_converted_table`.
>>
>>
>> Thanks
>> Vincent Dong
>>
>>
>>
>>
>
>
>
>


Re: Proper way to get DataStream

2021-04-01 Thread Arvid Heise
Hi,

it seems as if the data is written with a confluent registry in mind, so
you cannot use option 1: the kafka record is invalid avro as it contains a
5 byte prefix that identifies the schema.

So the second way, is the way to go and it actually works well: it tells
you that you have read with a schema that is mismatching the data. Once you
use the correct schema (user_visit.Envelope), it will work.

On Wed, Mar 31, 2021 at 1:46 PM Matthias Pohl 
wrote:

> Hi Maminspapin again,
> have you checked whether your topic actually contains data that matches
> your schema specified through cep.model.User?
>
> Best,
> Matthias
>
> On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:
>
>> Hi,
>>
>> I'm trying to solve a task with getting data from topic. This topic keeps
>> avro format data.
>>
>> I wrote next code:
>>
>>  public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> Schema schema = ReflectData.get().getSchema(User.class);
>> FlinkKafkaConsumer userConsumer = new
>> FlinkKafkaConsumer<>(
>>"test_topic",
>> *// First*
>> AvroDeserializationSchema.forGeneric(schema),
>> *// Second*
>> //
>> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";),
>> getConsumerProperties());
>>
>> DataStream userStream =
>> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
>> userStream.print("users");
>>
>> env.execute();
>> }
>>
>> So, as I think right, there are two ways to get the result:
>> 1. AvroDeserializationSchema.forGeneric(schema)
>> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";)
>>
>> And I use ReflectData.get().getSchema(User.class) to get schema.
>>
>>
>> Please, Flink guru, tell me if I am on the right way or not.
>>
>>
>> If I use First way, there is next error:
>>
>> java.io.EOFException
>> at org.apache.avro.io
>> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
>> at org.apache.avro.io
>> .BinaryDecoder.readInt(BinaryDecoder.java:150)
>> at org.apache.avro.io
>> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>>
>> If I use Second way, there is next error:
>>
>> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
>> expecting cep.model.User, missing required field userId
>> at org.apache.avro.io
>> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>
>> How can I get the correct result?
>>
>> Sorry, if duplicated:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>>
>> Today is third day I'm working with this issue (((
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Checkpoint timeouts at times of high load

2021-04-01 Thread Guowei Ma
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For
example, the detailed checkpoint information from the web.[1]  And which
Flink version do you use?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi Community,
>
>
> I have a number of flink jobs running inside my session cluster with
> varying checkpoint intervals plus a large amount of operator state and in
> times of high load, the jobs fail due to checkpoint timeouts (set to 6
> minutes). I can only assume this is because the latencies for saving
> checkpoints at these times of high load increase. I have a 30 node HDFS
> cluster for checkpoints... however I see that only 4 of these nodes are
> being used for storage. Is there a way of ensuring the load is evenly
> spread? Could there be another reason for these checkpoint timeouts? Events
> are being consumed from kafka, to kafka with EXACTLY ONCE guarantees
> enabled.
>
>
> Thank you very much!
>
>
> M.
>


Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
is implemented by yourself?
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some
retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries
for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d

Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I'm wondering if people have experienced issues with Taskmanager failure
> recovery when dealing with a lot of state.
>
> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
> and checkpoints. ~150 task managers with 4 slots each.
>
> When I run a pipeline without much state and kill one of the
> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
> eventually when a new replacement taskmanager is registered with the
> jobmanager things go back to healthy.
>
> But when I run a pipeline with a lot of state (1TB+) and kill one of the
> taskmanagers, the pipeline never recovers, even after the replacement
> taskmanager has joined. It just enters an infinite loop of restarts and
> failures.
>
> On the jobmanager, I see an endless loop of state transitions: RUNNING
> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
> It stays in RUNNING for a few seconds, but then transitions into FAILED
> with a message like this:
>
>
> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> readAddress(..) failed: Connection reset by peer (connection to '
> 10.30.10.53/10.30.10.53:45789')
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> readAddress(..) failed: Connection reset by peer
>
>
> Which, I guess, means a failed Taskmanager. And since there are not enough
> task slots to run it goes into this endless loop again. It's never the same
> Taskmanager that fails.
>
>
>
> On the Taskmanager side, things look more interesting. I see a variety of
> exceptions:
>
>
> org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
> attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>
>
> also
>
>
> WARNING: Failed read retry #1/10 for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
> Sleeping...
> java.nio.channels.ClosedByInterruptException
> at
> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
> Source)
> at
> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
> Source)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
> at java.base/java.io.DataInputStream.read(Unknown Source)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at java.base/java.io.InputStream.read(Unknown Source)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
> ...
>
>
> and
>
>
> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
> retries for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
> 20:52:46.894 [ (141/624)#7] ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
> Caught unexpected exception.
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>
>
> also
>
>
> 20:52:46.895 [ (141/624)#7] WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
>

Re: Restoring from Flink Savepoint in Kubernetes not working

2021-04-01 Thread Matthias Pohl
The logs would have helped to understand better what you were doing.

The stacktrace you shared indicates that you either asked for the status of
a savepoint creation that had already been completed and was, therefore,
removed from the operations cache or you used some job ID/request ID
pair that was not connected with any savepoint creation operation.
The operations are only cached for 300 seconds before being removed from
the cache. You could verify that the specific operation did expire and was
removed from the cache in the logs [1] stating something like: "Evicted
result with trigger id {} because its TTL of {}s has expired."

But you should be also able to verify the completion of the savepoint in
the logs.

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java#L104

On Wed, Mar 31, 2021 at 4:46 PM Claude M  wrote:

> Thanks for your reply.  I'm using the flink docker
> image flink:1.12.2-scala_2.11-java8.  Yes, the folder was created in S3.  I
> took a look at the UI and it showed the following:
>
> *Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type:
> CheckpointPath:
> s3:fcc82deebb4565f31a7f63989939c463/chk-49*
>
> However, this is different from the savepoint path I specified.  I
> specified the following:
>
> *s3:savepoint2/savepoint-9fe457-504c312ffabe*
>
> Is there anything specific you're looking for in the logs?  I did not find
> any exceptions and there is a lot of sensitive information I would have to
> extract from it.
>
> Also, this morning, I tried creating another savepoint.  It first showed
> it was In Progress.
>
> curl 
> http://localhost:8081/jobs/fcc82deebb4565f31a7f63989939c463/savepoints/4d19307dd99337257c4738871b1c63d8
> {"status":{"id":"IN_PROGRESS"},"operation":null}
>
> Then later when I tried to check the status, I saw the attached
> exception.
>
> In the UI, I see the following:
>
> *Latest Failed Checkpoint ID: 50Failure Time: 2021-03-31 09:34:43Cause:
> Asynchronous task checkpoint failed.*
>
> What does this failure mean?
>
>
> On Wed, Mar 31, 2021 at 9:22 AM Matthias Pohl 
> wrote:
>
>> Hi Claude,
>> thanks for reaching out to the Flink community. Could you provide the
>> Flink logs for this run to get a better understanding of what's going on?
>> Additionally, what exact Flink 1.12 version are you using? Did you also
>> verify that the snapshot was created by checking the actual folder?
>>
>> Best,
>> Matthias
>>
>> On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:
>>
>>> Hello,
>>>
>>> I have Flink setup as an Application Cluster in Kubernetes, using Flink
>>> version 1.12.  I created a savepoint using the curl command and the status
>>> indicated it was completed.  I then tried to relaunch the job from that
>>> save point using the following arguments as indicated in the doc found
>>> here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>>>
>>> args: ["standalone-job", "--job-classname", "", "--job-id",
>>> "", "--fromSavepoint", "s3:///",
>>> "--allowNonRestoredState"]
>>>
>>> After the job launches, I check the offsets and they are not the same as
>>> when the savepoint was created.  The job id passed in also does not match
>>> the job id that was launched.  I even put an incorrect savepoint path to
>>> see what happens and there were no errors in the logs and the job still
>>> launches.  It seems these arguments are not even being evaluated.  Any
>>> ideas about this?
>>>
>>>
>>> Thanks
>>>
>>


Checkpoint timeouts at times of high load

2021-04-01 Thread Geldenhuys, Morgan Karl
Hi Community,


I have a number of flink jobs running inside my session cluster with varying 
checkpoint intervals plus a large amount of operator state and in times of high 
load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only 
assume this is because the latencies for saving checkpoints at these times of 
high load increase. I have a 30 node HDFS cluster for checkpoints... however I 
see that only 4 of these nodes are being used for storage. Is there a way of 
ensuring the load is evenly spread? Could there be another reason for these 
checkpoint timeouts? Events are being consumed from kafka, to kafka with 
EXACTLY ONCE guarantees enabled.


Thank you very much!


M.


Re: JDBC connector support for JSON

2021-04-01 Thread Matthias Pohl
Hi Fanbin,
I'm not that familiar with the FlinkSQL features. But it looks like the
JdbcConnector does not support Json as stated in the documentation [1]. You
might work around it by implementing your own user-defined functions [2].

I hope this helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html

On Wed, Mar 31, 2021 at 7:04 AM Fanbin Bu  wrote:

> Hi,
>
> For a streaming job that uses Kafka connector, this doc
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options
> shows that we can parse json data format. However, it does not seem
> like Flink JDBC connector support json data type, at least from this doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping
> .
>
> So the question is, does JDBC connector also have this capability? if not,
> what's required to enable it. At the end of the day, I would like to see
> something like this:
>
> create table aTable(field1 type, jsonField1 ROW ROW >)
> with
> (
> 'connector' = 'jdbc',
> 'url' = '...',
> 'table-name' = 'my-table-with-json-column',
> ...
> )
>
> tEnv.executeSql("select jsonField1.jsonField2.field3 from aTable")
>
> Thanks,
> Fanbin
>


Re: ARM support

2021-04-01 Thread Guowei Ma
Hi, Rex

I think that Flink does not have an official release that supports the arm
architecture. There are some efforts and discussion [1][2][3] about
supporting the architecture. I think you could find some builds at
openlabtesting. [4]
But AFAIK there is no clear timeline about that.(correct me if I miss
something) There is a discussion [5] and I think you might find some
insight from there at that time.

[1] https://issues.apache.org/jira/browse/FLINK-13448
[2]
https://lists.apache.org/thread.html/a564836a3c7cc5300bec7729c2af1ad9d611d526bb59dd6cca72cc7b%40%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/2399c8a701bced2266f9658719807b98a2e593a99b949f50e9a1ab1a%40%3Cdev.flink.apache.org%3E
[4] http://status.openlabtesting.org/builds?project=apache%2Fflink
[5]
https://lists.apache.org/thread.html/5c4c75a2de979ed7ef1c661c15dd252569e598a374c27042b38d078b%40%3Cdev.flink.apache.org%3E

Best,
Guowei


On Thu, Apr 1, 2021 at 3:55 AM Rex Fenley  wrote:

> Hello,
>
> We would like to run Flink on ARM yet haven't found any resources
> indicating that this is yet possible. We are wondering what the timeline is
> for Flink supporting ARM. Given that all Mac Books are moving to ARM and
> that AWS is excitedly supporting ARM, it seems important that Flink also
> supports running on ARM.
>
> Thank you
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Kurt Young
Thanks Dawid, I have merged FLINK-20320.

Best,
Kurt


On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> @Kurt @Arvid I think it's fine to merge those two, as they are pretty much
> finished. We can wait for those two before creating the RC0.
>
> @Leonard Personally I'd be ok with 3 more days for that single PR. I find
> the request reasonable and I second that it's better to have a proper
> review rather than rush unfinished feature and try to fix it later.
> Moreover it got broader support. Unless somebody else objects, I think we
> can merge this PR later and include it in RC1.
>
> Best,
>
> Dawid
> On 01/04/2021 08:39, Arvid Heise wrote:
>
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
> pretty much just waiting for AZP to turn green, it's separate from other
> components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>
>> Hi Guowei and Dawid,
>>
>> I want to request the permission to merge this feature [1], it's a useful
>> improvement to sql client and won't affect
>> other components too much. We were plan to merge it yesterday but met
>> some tricky multi-process issue which
>> has a very high possibility hanging the tests. It took us a while to find
>> out the root cause and fix it.
>>
>> Since it's not too far away from feature freeze and RC0 also not created
>> yet, thus I would like to include this
>> in 1.13.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>>
>>> Hi, community:
>>>
>>> Friendly reminder that today (3.31) is the last day of feature
>>> development. Under normal circumstances, you will not be able to submit new
>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>> testing, welcome to help test together.
>>> After the test is relatively stable, we will cut the release-1.13 branch.
>>>
>>> Best,
>>> Dawid & Guowei
>>>
>>>
>>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>>> wrote:
>>>
 +1 for the 31st of March for the feature freeze.

 Cheers,
 Till

 On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
 wrote:

 > +1 for March 31st for the feature freeze.
 >
 >
 >
 > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
 dwysakow...@apache.org>
 > wrote:
 >
 > > Thank you Thomas! I'll definitely check the issue you linked.
 > >
 > > Best,
 > >
 > > Dawid
 > >
 > > On 23/03/2021 20:35, Thomas Weise wrote:
 > > > Hi Dawid,
 > > >
 > > > Thanks for the heads up.
 > > >
 > > > Regarding the "Rebase and merge" button. I find that merge option
 > useful,
 > > > especially for small simple changes and for backports. The
 following
 > > should
 > > > help to safeguard from the issue encountered previously:
 > > > https://github.com/jazzband/pip-tools/issues/1085
 > > >
 > > > Thanks,
 > > > Thomas
 > > >
 > > >
 > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
 > dwysakow...@apache.org
 > > >
 > > > wrote:
 > > >
 > > >> Hi devs, users!
 > > >>
 > > >> 1. *Feature freeze date*
 > > >>
 > > >> We are approaching the end of March which we agreed would be the
 time
 > > for
 > > >> a Feature Freeze. From the knowledge I've gather so far it still
 seems
 > > to
 > > >> be a viable plan. I think it is a good time to agree on a
 particular
 > > date,
 > > >> when it should happen. We suggest *(end of day CEST) March 31st*
 > > >> (Wednesday next week) as the feature freeze time.
 > > >>
 > > >> Similarly as last time, we want to create RC0 on the day after
 the
 > > feature
 > > >> freeze, to make sure the RC creation process is running
 smoothly, and
 > to
 > > >> have a common testing reference point.
 > > >>
 > > >> Having said that let us remind after Robert & Dian from the
 previous
 > > >> release what it a Feature Freeze means:
 > > >>
 > > >> *B) What does feature freeze mean?*After the feature freeze, no
 new
 > > >> features are allowed to be merged to master. Only bug fixes and
 > > >> documentation improvements.
 > > >> The release managers will revert new feature commits after the
 feature
 > > >> freeze.
 > > >> Rational: The goal of the feature freeze phase is to improve the
 > system
 > > >> stability by addressing known bugs. New features tend to
 introduce new
 > > >> instabilities, which would prolong the release process.
 > > >> If you need to merge a new feature after the freeze, please open
 a
 > > >> discussion on the dev@ list. If there are no objections by a PMC
 > member
 > > >> within 48 (workday)hours, t