Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)
Hi Jonas, Just a thought, could you try this policy? If I recall correctly, I think you need ListBucket on the bucket itself, whereas the other can have a path prefix like the "/*" you added " { "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:ListBucket", "s3:Get*", "s3:Put*", "s3:Delete*" ], "Resource": [ "arn:aws:s3:::-flink-dev", "arn:aws:s3:::-flink-dev/*" ], "Effect": "Allow" } ] } " Svend On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote: > Hey Matthias, > > Yes, I have followed the documentation on the link you provided - and decided > to go for the recommended approach of using IAM roles. > The hive.s3.use-instance-credentials configuration parameter I got from [1] > (first bullet) since I am using the flink-s3-fs-presto plugin - which says: > > ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is > based on code from the Presto project <https://prestodb.io/>. You can > configure it using the same configuration keys as the Presto file system > <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, > by adding the configurations to your `flink-conf.yaml`. The Presto S3 > implementation is the recommended file system for checkpointing to S3 > > Its possible I am misunderstanding it? > > Best, > Jonas > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > > Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl : >> Hi Jonas, >> have you included the s3 credentials in the Flink config file like it's >> described in [1]? I'm not sure about this hive.s3.use-instance-credentials >> being a valid configuration parameter. >> >> Best, >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials >> >> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob wrote: >>> Hey, >>> >>> I am setting up HA on a standalone Kubernetes Flink application job >>> cluster. >>> Flink (1.12.5) is used and I am using S3 as the storage backend >>> >>> * The JobManager shortly fails after starts with the following errors >>> (apologies in advance for the length), and I can't understand what's going >>> on. >>> * First I thought it may be due to missing Delete privileges of the IAM >>> role and updated that, but the problem persists. >>> * The S3 bucket configured s3:///recovery is empty. >>> >>> configmap.yaml >>> flink-conf.yaml: |+ >>> jobmanager.rpc.address: {{ $fullName }}-jobmanager >>> jobmanager.rpc.port: 6123 >>> jobmanager.memory.process.size: 1600m >>> taskmanager.numberOfTaskSlots: 2 >>> taskmanager.rpc.port: 6122 >>> taskmanager.memory.process.size: 1728m >>> blob.server.port: 6124 >>> queryable-state.proxy.ports: 6125 >>> parallelism.default: 2 >>> scheduler-mode: reactive >>> execution.checkpointing.interval: 10s >>> restart-strategy: fixed-delay >>> restart-strategy.fixed-delay.attempts: 10 >>> high-availability: >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> kubernetes.cluster-id: {{ $fullName }} >>> high-availability.storageDir: s3://-flink-{{ .Values.environment >>> }}/recovery >>> hive.s3.use-instance-credentials: true >>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for >>> running the jobmanager and taskmanager pods >>> >>> role.yaml >>> kind: Role >>> apiVersion: rbac.authorization.k8s.io/v1 >>> metadata: >>> name: {{ $fullName }} >>> namespace: {{ $fullName }} >>> labels: >>> app: {{ $appName }} >>> chart: {{ template "thoros.chart" . }} >>> release: {{ .Release.Name }} >>> heritage: {{ .Release.Service }} >>> >>> rules: >>> - apiGroups: [""] >>> resources: ["configmaps"] >>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"] >>> >>> aws IAM policy >>> { >>> "Version": "2012-10-17&
Re: Production Grade GitOps Based Kubernetes Setup
Hi all, I reached out [1] to Filipe Regadas, the author of the Spotify fork of the GCP k8s operator you linked in your message. He confirms he's actively working on it and would welcome PR and community input. I have a modest PR I'll submit to him some time this week already. This seems to me a promising option to look into. Svend [1] https://github.com/spotify/flink-on-k8s-operator/issues/82 On Mon, 9 Aug 2021, at 12:36 PM, Niklas Wilcke wrote: > Hi Yuval, > > thank you for sharing all the information. I forgot to mention the Lyft > operator. Thanks for "adding" it to the list. > About the dual cluster approach during upgrade I have some doubts about the > resource usage. If you are operating some "big" jobs that would mean you > always have to provide enough resources to run two of them in parallel during > the upgrade or is there some workaround (downscaling, upscaling) available? > I will further investigate how the option three > "GoogleCloudPlatform/flink-on-k8s-operator" is implementing the upgrade > process. > > I agree that there is a need for a community based operator project. It is > unfortunate that both "relevant" projects (Lyft, GCP) have been more or less > abandoned. The only thing that is left I can see with some activity is a fork > of the GCP operator from spotify [0], but there is only one person involved. > > Regards, > Niklas > > [0] https://github.com/spotify/flink-on-k8s-operator > > > UNIBERG GmbH > Simon-von-Utrecht-Straße 85a > 20359 Hamburg > > niklas.wil...@uniberg.com > Mobile: +49 160 9793 2593 > Office: +49 40 2380 6523 > > > UNIBERG GmbH, Dorfstraße 3, 23816 Bebensee > > Registergericht / Register: Amtsgericht Kiel HRB SE-1507 > Geschäftsführer / CEO‘s: Andreas Möller, Martin Ulbricht > > Informationen zum Datenschutz / Privacy Information: > https://www.uniberg.com/impressum.html > >> On 6. Aug 2021, at 16:59, Yuval Itzchakov wrote: >> >> Hi Niklas, >> >> We are currently using the Lyft operator for Flink in production >> (https://github.com/lyft/flinkk8soperator), which is additional alternative. >> The project itself is pretty much in Zombie state, but commits happen every >> now and then. >> >> 1. Native Kubernetes could definitely work with GitOps, it would just >> require you to do lots of steps "by hand" in terms of application upgrade >> and rollover. >> 2. We're using Lyfts operator as mentioned above. It mostly works well, >> there were several issues we had along the way but were mostly resolved. One >> feature that is missing for us specifically is being able to perform an >> upgrade by first savepointing and killing the existing cluster and only then >> deploying a new one (their approach is dual, meaning have two clusters up >> and running before doing the rollover). >> 3. At it's current state it looks more like a side project than an actively >> maintained operator. >> 4. Ververica is definitely an option, we haven't tested their operator, not >> sure about the maturity level yet. >> >> I think a Flink community based operator for k8s is a much needed project >> (which I'd be happy to contribute to). >> >> >> >> >> On Fri, Aug 6, 2021, 14:49 Niklas Wilcke wrote: >>> Hi Flink Community, >>> >>> I'm currently assessing the situation about how to properly deploy Flink on >>> Kubernetes via GitOps. There are some options available to deploy Flink on >>> Kubernetes, which I would like to discuss. In general we are looking for >>> an open source or at least unpaid solution, but I don't exclude paid >>> solutions from the beginning. >>> I see the following options. >>> >>> 1. Kubernetes Standalone [1] >>> * Seems to be deprecated, since the docs state to use Native Kubernetes >>> instead >>> 2. Native Kubernetes [2] >>> * Doesn't seem to implement the Kubernetes operator pattern >>> * Seems to require command line activities to be operated / upgraded (not >>> GitOps compatible out of the box) >>> 3. "GoogleCloudPlatform/flink-on-k8s-operator" Operator [3] >>> * Seems not to be well maintained / documented >>> * We had some trouble with crashes during configuration changes, but we >>> need to investigate further >>> * There is a "maintained" fork from spotify, which could be an option >>> 4. Flink Native Kubernetes Operator [4] >>> * Seems to be a private project from a Flink Committer, which might not be >
Re: Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"
Hi again, After a bit of experimentation (and actually reading the bug report I linked), I realized the issue was that the parallelism was higher than the number of Kafka partitions => reducing the parallelism enabled the checkpoints to work as expected. => since it seems unsupported, maybe KafkaSource should refuse to even start if its configured parallelism is higher than the kafka partitioning ? Otherwise, this error condition is rather difficult to interpret IMHO. I'm happy to open a jira and work on that if that's desired? Best regards, Svend On Wed, 4 Aug 2021, at 11:32 AM, Svend wrote: > Hi Robert, > > Thanks for the feed-back. > > You are correct, the behavior is indeed different: when I make the source > bounded, the application eventually stops whereas without that setting it > runs forever. > > In both cases neither checkpoints nor data is being written to the filesystem. > > I re-ran the experiment to get more info: > > * when making the kafka source unbounded (which is not what I want), I notice > log telling tasks associated to the Kafka source gets unregistered in the > beginning: > > 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source > -> Sink: Unnamed (3/8)#0 â Freeing task resources for Source: Kafka Source > -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d). > 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor > flink-akka.actor.default-dispatcher-6 â Un-registering task and sending > final execution state FINISHED to JobManager for task Source: Kafka Source -> > Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559. > > > Later on, it seems each checkpoint fails while complaining that some tasks > are not running => would that be caused by the finished tasks above? > > 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator > Checkpoint Timer â Failed to trigger checkpoint for job > 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job > 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint > Failure reason: Not all required tasks are currently running. > > > * when setting the kafka source back to bounded, I also notice such > "unregistered" log message, before one "Checkpoint Timer â Failed to > trigger c" log, and then the job finishes > > > By digging a little this seems similar to this bug: > https://issues.apache.org/jira/browse/FLINK-2491 > > I've tried to add a supplementary ProcessFunction in the pipeline just to > have one more stateful thing and hope to trigger the checkpoint, though > without success. > > > > > On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote: >> Hi Svend, >> I'm a bit confused by this statement: >> >>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >>> kafka source yields the same result >> >> My expectation would be that the source runs forever, if it is not bounded. >> Are you sure this error message is not coming from another task? >> >> >> On Sat, Jul 31, 2021 at 11:23 AM Svend wrote: >>> __ >>> Hi everyone, >>> >>> I'm failing to write a simple Flink 1.13.1 application with the DataStream >>> that reads from kafka and writes to parquet. >>> >>> My intention is to run this as a job that reads what is currenlty in kafka, >>> shuts down when reaching current end of each partition and picks up from >>> there next time it's started. >>> >>> I've tried several variations, I can't get anything to work properly: >>> >>> * In streaming mode, enabling checkpoint and setting the kafkaSource to >>> bounded (see code sample below), the application fails to perform >>> checkpoint complaining about: >>> >>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks >>> of job ... has been finished" >>> >>> => no parquet part gets written, no checkpoint gets written and no kafka >>> offset get committed >>> >>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >>> kafka source yields the same result >>> >>> * I also tried in batch mode, removing the checkpoint, switching the >>> StreamingFileSink for a FileSink and using Kafka's >>> "auto.commit.interval.ms" => in that case I'm getting some parquet output >>> and kafka offsets are committed, but the application shuts down before >>> flushing the offset of what has been read, s.t. the latest kafka events >>> will be
Re: Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"
Hi Robert, Thanks for the feed-back. You are correct, the behavior is indeed different: when I make the source bounded, the application eventually stops whereas without that setting it runs forever. In both cases neither checkpoints nor data is being written to the filesystem. I re-ran the experiment to get more info: * when making the kafka source unbounded (which is not what I want), I notice log telling tasks associated to the Kafka source gets unregistered in the beginning: 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source -> Sink: Unnamed (3/8)#0 â Freeing task resources for Source: Kafka Source -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d). 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor flink-akka.actor.default-dispatcher-6 â Un-registering task and sending final execution state FINISHED to JobManager for task Source: Kafka Source -> Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559. Later on, it seems each checkpoint fails while complaining that some tasks are not running => would that be caused by the finished tasks above? 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator Checkpoint Timer â Failed to trigger checkpoint for job 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running. * when setting the kafka source back to bounded, I also notice such "unregistered" log message, before one "Checkpoint Timer â Failed to trigger c" log, and then the job finishes By digging a little this seems similar to this bug: https://issues.apache.org/jira/browse/FLINK-2491 I've tried to add a supplementary ProcessFunction in the pipeline just to have one more stateful thing and hope to trigger the checkpoint, though without success. On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote: > Hi Svend, > I'm a bit confused by this statement: > >> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >> kafka source yields the same result > > My expectation would be that the source runs forever, if it is not bounded. > Are you sure this error message is not coming from another task? > > > On Sat, Jul 31, 2021 at 11:23 AM Svend wrote: >> __ >> Hi everyone, >> >> I'm failing to write a simple Flink 1.13.1 application with the DataStream >> that reads from kafka and writes to parquet. >> >> My intention is to run this as a job that reads what is currenlty in kafka, >> shuts down when reaching current end of each partition and picks up from >> there next time it's started. >> >> I've tried several variations, I can't get anything to work properly: >> >> * In streaming mode, enabling checkpoint and setting the kafkaSource to >> bounded (see code sample below), the application fails to perform checkpoint >> complaining about: >> >> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks >> of job ... has been finished" >> >> => no parquet part gets written, no checkpoint gets written and no kafka >> offset get committed >> >> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >> kafka source yields the same result >> >> * I also tried in batch mode, removing the checkpoint, switching the >> StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" >> => in that case I'm getting some parquet output and kafka offsets are >> committed, but the application shuts down before flushing the offset of what >> has been read, s.t. the latest kafka events will be read again at the next >> start. >> >> This all sounds very basic, I see other people do this kind of thing >> (recently, [1]), and II was really expecting the combinaision of KafkaSource >> with StreamingFileSink and checkpointing to work, all those are streaming >> concepts. Hopefully I'm doing something wrong? >> >> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser >> >> Thanks a lot in advance, >> >> Svend >> >> ``` >> // I'm testing this by launching the app an IDE >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new >> DynamicProtobufKryoSerializer(params)); >> >> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> env.enableCheckpointing(1000); >> >> env.getCheckpointConfig().setCheck
Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"
Hi everyone, I'm failing to write a simple Flink 1.13.1 application with the DataStream that reads from kafka and writes to parquet. My intention is to run this as a job that reads what is currenlty in kafka, shuts down when reaching current end of each partition and picks up from there next time it's started. I've tried several variations, I can't get anything to work properly: * In streaming mode, enabling checkpoint and setting the kafkaSource to bounded (see code sample below), the application fails to perform checkpoint complaining about: "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks of job ... has been finished" => no parquet part gets written, no checkpoint gets written and no kafka offset get committed * In sreaming mode, with checkpoing but removing the `setBounded()` on the kafka source yields the same result * I also tried in batch mode, removing the checkpoint, switching the StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" => in that case I'm getting some parquet output and kafka offsets are committed, but the application shuts down before flushing the offset of what has been read, s.t. the latest kafka events will be read again at the next start. This all sounds very basic, I see other people do this kind of thing (recently, [1]), and II was really expecting the combinaision of KafkaSource with StreamingFileSink and checkpointing to work, all those are streaming concepts. Hopefully I'm doing something wrong? [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser Thanks a lot in advance, Svend ``` // I'm testing this by launching the app an IDE StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new DynamicProtobufKryoSerializer(params)); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints"); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); KafkaSource kafkaSource = KafkaSource.builder() .setBootstrapServers("localhost:9092") .setTopics("some-topic") .setGroupId("my-group") .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema()) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setBounded(OffsetsInitializer.latest()) .build(); StreamingFileSink parquetSink = StreamingFileSink .forBulkFormat( Path.fromLocalFile(new File("/tmp/job-output/some-topic.parquet")), new ParquetWriterFactory<>((out) -> new ParquetDynamicMessageWriterBuilder(out, params).build())) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); env .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") .sinkTo(parquetSink); env.execute("my-job"); ```
Re: How to configure column width in Flink SQL client?
Thanks for the feed-back Ingo, Do you think a PR would be welcome to make that parameter configurable? At the place where I work, UUID are often used as column values and they are 36 character longs => very often a very useful piece of information to us is not readable. I had a quick look, the max width seems to be defined in [1], and used in various places like [2] and [3]. Should I open a Jira to discuss this and cc you in it? Cheers, Svend [1] https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74 [2] https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L102 [3] https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java#L143 On Tue, 8 Jun 2021, at 7:34 AM, Ingo Bürk wrote: > Hi Svend, > > unfortunately the column width in the SQL client cannot currently be > configured. > > > Regards > Ingo > > On Mon, Jun 7, 2021 at 4:19 PM Svend wrote: >> __ >> >> Hi everyone, >> >> When using the Flink SQL client and displaying results interactively, it >> seems the values of any column wider than 24 characters is truncated, which >> is indicated by a '~' character, e.g. the "member_user_id" below: >> >> ``` >> SELECT >> metadata.true_as_of_timestamp_millis, >> member_user_id, >> membership_updated.new_status.updated_value >> FROM fandom_members_events >> WHERE >>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c' >> >> >> true_as_of_timestamp_mil~member_user_id updated_value >> 1622811665919 45ca821f-c0fc-4114-bef8-~ >> (NULL) >> 1622811665919 45ca821f-c0fc-4114-bef8-~ >> JOINED >> 1622118951005 b4734391-d3e1-417c-ad92-~ >> (NULL) >> ... >> ``` >> >> Is there a way to configure the displayed width? I didn't find any parameter >> for this in >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options >> >> >> Thanks a lot in advance! >> >> Svend
How to configure column width in Flink SQL client?
Hi everyone, When using the Flink SQL client and displaying results interactively, it seems the values of any column wider than 24 characters is truncated, which is indicated by a '~' character, e.g. the "member_user_id" below: ``` SELECT metadata.true_as_of_timestamp_millis, member_user_id, membership_updated.new_status.updated_value FROM fandom_members_events WHERE group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c' true_as_of_timestamp_mil~member_user_id updated_value 1622811665919 45ca821f-c0fc-4114-bef8-~(NULL) 1622811665919 45ca821f-c0fc-4114-bef8-~JOINED 1622118951005 b4734391-d3e1-417c-ad92-~(NULL) ... ``` Is there a way to configure the displayed width? I didn't find any parameter for this in https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options Thanks a lot in advance! Svend
Re: S3 + Parquet credentials issue
Hi Angelo, I've not worked exactly in the situation you described, although I've had to configure S3 access from a Flink application recently and here are a couple of things I learnt along the way: * You should normally not need to include flink-s3-fs-hadoop nor hadoop-mapreduce-client-core in your classpath but should rather make flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. The motivation for that is that this jar is a fat jar containing a lot of hadoop and aws classes, s.t. including it in your classpath quickly leads to conflicts. The plugins folder is associated with a separate classpath, with helps avoiding those conflicts. * Under the hood, Fink is using the hadoop-aws library to connect to s3 => the documentation regarding how to configure it, and especially security accesses, is available in [1] * Ideally, when running on AWS, your code should not be using BasicAWSCredentialsProvider, but instead the application should assume a role, which you associate with some IAM permission. If that's your case, the specific documentation for that situation is in [2]. If you're running some test locally on your laptop, BasicAWSCredentialsProvider with some key id and secret pointing to a dev account may be appropriate. * As I understand it, any configuration entry in flink.yaml that starts with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by reading documentation in [1] and [2] you might be able to figure out which parameters are relevant to your case, which you can then set with the mechanism just mentioned. For example, in my case, I simply add this to flink.yaml: fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" * You can debug the various operations that are attempted on S3 by setting this logger to DEBUG level: org.apache.hadoop.fs.s3a Good luck :) Svend [1] https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html [2] https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html [3] https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink- On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote: > Hello, > > Trying to read a parquet file located in S3 leads to a AWS credentials > exception. Switching to other format (raw, for example) works ok regarding to > file access. > > This is a snippet of code to reproduce the issue: > > static void parquetS3Error() { > > EnvironmentSettings settings = > EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build(); > > TableEnvironment t_env = TableEnvironment.*create*(settings); > > // parquet format gives error: > // Caused by: java.net.SocketTimeoutException: doesBucketExist on > bucket-prueba-medusa: com.amazonaws.AmazonClientException: > // No AWS Credentials provided by BasicAWSCredentialsProvider > EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : > // com.amazonaws.SdkClientException: Failed to connect to service > endpoint: > t_env.executeSql("CREATE TABLE backup ( `date` STRING, `value` INT) > WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = > 'parquet')"); > > // other formats (i.e. raw) work properly: > // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5 > //++ > //|url | > //++ > //| [80, 65, 82, 49, 21, 0, 21,... | > //| [0, 0, 0, 50, 48, 50, 49, 4... | > t_env.executeSql("CREATE TABLE backup ( `url` BINARY) WITH ( 'connector' > = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')"); > > Table t1 = t_env.from("backup"); > > t1.execute().print(); > > } > Flink version is 1.12.2. > > Please find attached the pom with dependencies and version numbers. > > What would be a suitable workaround for this? > > Thank you very much. > > Angelo. > > > > > *Attachments:* > * pom.xml
Re: Flink in k8s operators list
Hi Ilya, Thanks for the kind feed-back. We hit the first issue you mention related to K8s 1.18+, we then updated the controller-gen version to 0.2.4 in the makefile as described in the ticket you linked, and then ran "make deploy", which worked around the issue for us. I'm not aware of the 2nd issue you refer to related to in-progress job? In case that helps, we access the Flink-UI by simply opening a port-forward on port 8081 on the job manager, which among other things shows the currently running jobs. Svend On Mon, 31 May 2021, at 12:00 PM, Ilya Karpov wrote: > Hi Svend, > > thank you so much to sharing your experience! GCP k8s operator looks > promising (currently i’m trying to build it and run helm chart. An issue > <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/266> > with k8s version 1.18+ is road block right now, but I see that there is a > solution), and also seems like flink team also refers > <http://disq.us/p/2f7goq6> to it this implementation. > > In your setup did you solve the problem of visualising list of in-progress > jobs? > > > One worrying point though is that the maintainers of the repo seem to have > > become silent in March this year. > lyfts implementation <https://github.com/lyft/flinkk8soperator> (haven’t > tried it yet) seems to be even more abandoned (last release 20/04/2020). > >> 29 мая 2021 г., в 11:23, Svend написал(а): >> >> Hi Ilya, >> >> At my company we're currently using the GCP k8s operator (2nd on your list). >> Our usage is very moderate, but so far it works great for us. >> >> We appreciate that when upgrading the application, it triggers automatically >> a savepoint during shutdown and resumes from it when restarting. It also >> allows to take savepoints at regular intervals (we take one per day >> currently). >> >> We're using it with Flink 1.12.4 and AWS EKS. >> >> Getting the Flink metrics and logs exported to our monitoring system worked >> out of the box. >> >> Configuring IAM roles and K8s service account for saving checkpoints and >> savepoints to S3 required a bit more fiddling although we got it working. >> >> Happy to share code snippet about any of that if that's useful :) >> >> It was last updated with Flink 1.11 in mind, so there is currently no >> built-in support for the reactive scaling mode recently added in Flink 1.13. >> >> One worrying point though is that the maintainers of the repo seem to have >> become silent in March this year. There is a small and active community >> around it though and issues and PRs keep on arriving and are waiting for >> feed-back. It's all free and OSS, so who are we to complain? Though it's >> still an important attention point. >> >> Hope this helps, >> >> Svend >> >> >> >> >> >> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote: >>> Hi there, >>> >>> I’m making a little research about the easiest way to deploy link job to >>> k8s cluster and manage its lifecycle by *k8s operator*. The list of >>> solutions is below: >>> - https://github.com/fintechstudios/ververica-platform-k8s-operator >>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator >>> - https://kudo.dev/docs/examples/apache-flink.html >>> - https://github.com/wangyang0918/flink-native-k8s-operator >>> >>> If you are using smth that is not listed above please share! Any share >>> about how specific solution works is greatly appreciated. >>> >>> Thanks in advance
Re: Idle source configuration per topic with the Kafka Table API connector
Awesome, thanks a lot for clarifications Jing Zhang, it's very useful. Best, Svend On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote: > Hi Svend, > Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those > version provides many related improvements. > > > as per [1] > Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a > global parameter, It would apply to all those table sources which with > watermark clause but not use SOURCE WATERMARK > > as per [2] > Yes. > > If that is correct, I guess I can simply use the DataStream connector for > > that specific topic and then convert it to a Table. > Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like > the following demo: > Table table = > tableEnv.fromDataStream( > dataStream, > Schema.*newBuilder*() > . // other logical > .watermark("columnName", "SOURCE_WATERMARK()") > .build()); > I would like to invite Jark And Timo to double check, they are more familiar > with the issue. > > Best, > JING ZHANG > > > Svend 于2021年5月29日周六 下午3:34写道: >> __ >> Hi everyone, >> >> My Flink streaming application consumes several Kafka topics, one of which >> receiving traffic in burst once per day. >> >> I would like that topic not to hold back the progress of the watermark. >> >> Most of my code is currently using the SQL API and in particular the Table >> API Kafka connector. >> >> I have read about the idle source configuration mechanism, could you please >> confirm my understanding that: >> >> * as per [1]: when I'm using the Table API Kafka connector, we currently do >> not have the possibility to specify the idle source parameter specifically >> for each topic, although we can set it globally on the >> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter >> >> * as per [2]: when using the DataStream Kafka connector, we can set the idle >> source parameter specifically for each topic by specifying ".withIdleness()" >> to the WatermarkStrategy. >> >> If that is correct, I guess I can simply use the DataStream connector for >> that specific topic and then convert it to a Table. >> >> Thanks a lot! >> >> Svend >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks >> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission >> >> >>
Re: Flink in k8s operators list
Hi Ilya, At my company we're currently using the GCP k8s operator (2nd on your list). Our usage is very moderate, but so far it works great for us. We appreciate that when upgrading the application, it triggers automatically a savepoint during shutdown and resumes from it when restarting. It also allows to take savepoints at regular intervals (we take one per day currently). We're using it with Flink 1.12.4 and AWS EKS. Getting the Flink metrics and logs exported to our monitoring system worked out of the box. Configuring IAM roles and K8s service account for saving checkpoints and savepoints to S3 required a bit more fiddling although we got it working. Happy to share code snippet about any of that if that's useful :) It was last updated with Flink 1.11 in mind, so there is currently no built-in support for the reactive scaling mode recently added in Flink 1.13. One worrying point though is that the maintainers of the repo seem to have become silent in March this year. There is a small and active community around it though and issues and PRs keep on arriving and are waiting for feed-back. It's all free and OSS, so who are we to complain? Though it's still an important attention point. Hope this helps, Svend On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote: > Hi there, > > I’m making a little research about the easiest way to deploy link job to k8s > cluster and manage its lifecycle by *k8s operator*. The list of solutions is > below: > - https://github.com/fintechstudios/ververica-platform-k8s-operator > - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > - https://kudo.dev/docs/examples/apache-flink.html > - https://github.com/wangyang0918/flink-native-k8s-operator > > If you are using smth that is not listed above please share! Any share about > how specific solution works is greatly appreciated. > > Thanks in advance
Idle source configuration per topic with the Kafka Table API connector
Hi everyone, My Flink streaming application consumes several Kafka topics, one of which receiving traffic in burst once per day. I would like that topic not to hold back the progress of the watermark. Most of my code is currently using the SQL API and in particular the Table API Kafka connector. I have read about the idle source configuration mechanism, could you please confirm my understanding that: * as per [1]: when I'm using the Table API Kafka connector, we currently do not have the possibility to specify the idle source parameter specifically for each topic, although we can set it globally on the StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter * as per [2]: when using the DataStream Kafka connector, we can set the idle source parameter specifically for each topic by specifying ".withIdleness()" to the WatermarkStrategy. If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table. Thanks a lot! Svend [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
How to process recent events from Kafka and older ones from another storage?
Hi everyone, What is the typical architectural approach with Flink SQL for processing recent events from Kafka and older events from some separate cheaper storage? I currently have the following situation in mind: * events are appearing in Kafka and retained there for, say, 1 month * events are also regularly forwarded to AWS S3 into Parquet files Given a use case, or a bugfix, that requires to (re)-process events older than 1 month, how do we design a Flink SQL pipeline that produces correct results? * one possibility I envisaged is simply to first start the Flink SQL application using a FileSystem connector to read the events from parquet, then to shut it down while triggering a savepoint and finally resume it from that savepoint while now using the Kafka connector. I saw some presentations where engineers from Lyft were discussing that approach. For what I understand though, the FileSystem connector currently does not emit watermarks (I understand https://issues.apache.org/jira/browse/FLINK-21871 is addressing just that), so if I get things correctly that would imply that none of my Flink SQL code can depend on event time, which seems very restrictive. * another option is to use infinite retention in Kafka, which is expensive, or to copy old events from S3 back to Kafka when we need to process them. Since streaming in Flink, data retention in Kafka and pipeline backfilling are such common concepts, I am imagining that many teams are addressing the situation I'm describing above already. What is the usual way of approaching this? Thanks a lot in advance
Re: How to create a java unit test for Flink SQL with nested field and watermark?
Thanks for the feedback. The CSV is a good idea and will make my tests more readable, I'll use that. Looking forward to Flink 1.13 ! Svend On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote: > Hi, > > there are multiple ways to create a table for testing: > > - use the datagen connector > - use the filesystem connector with CSV data > - and beginning from Flink 1.13 your code snippets becomes much simpler > > Regards, > Timo > > On 29.04.21 20:35, Svend wrote: > > I found an answer to my own question! > > > > For future reference, the snipet below allows to create a SQL table with > > a nested field and a watermark and filled with hard-coded values, which > > is all I need in order to test SQL expressions. > > > > It's quite a mouthful though, is there a more succint to express the > > same thing? > > > > > > var testData = List./of/( > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")), > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")), > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20")) > > ); > > var testStream = streamEnv > > .fromCollection(testData, > > Types./ROW_NAMED/(new String[] {"created", "event_time"}, > > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/), > > Types./SQL_TIMESTAMP > > /) > > ) > > .assignTimestampsAndWatermarks(WatermarkStrategy > > ./forBoundedOutOfOrderness/(Duration./ofMinutes/(10)) > > .withTimestampAssigner( > > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) > > (t2.getField(1))).getTime())) > > ); > > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), > > /$/("event_time").rowtime()); > > tableEnv.createTemporaryView("post_events_kafka", testDataTable); > > > > > > > > > > > > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote: > >> I'm trying to write java unit test for a Flink SQL application using > >> Flink mini cluster, but I do not manage to create an input table with > >> nested fields and time characteristics. > >> > >> I had a look at the documentation and examples below, although I'm > >> still struggling: > >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html > >> > >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html> > >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java > >> > >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java> > >> > >> > >> Consider for example this simple expression that I want to test and > >> which depends on the nested field "created.group_id" and expects > >> "metricValue" to be the row time: > >> > >> > >> var createTableDDl = "" > >> + " CREATE TEMPORARY VIEW > >> postCreated10min \n" > >> + " > >> AS > >> > >> \n" > >> + " > >> SELECT > >> > >> \n" > >> + " created.group_id as > >> groupId, \n" > >> + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as > >> metricTime, \n" > >> + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as > >> rowTime, \n" > >> + " count(1) as > >> metricValue \n" > >> + " FROM > >> post_events_kafka \n" > >> + " GROUP > >> BY\n" > >> + " > >> created.group_id, > >> \n" > >> + " TUMBLE(event_time, INTERVAL '10' > >> MINUTES)
Re: How to create a java unit test for Flink SQL with nested field and watermark?
I found an answer to my own question! For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions. It's quite a mouthful though, is there a more succint to express the same thing? var testData = List.*of*( Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")), Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")), Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20")) ); var testStream = streamEnv .fromCollection(testData, Types.*ROW_NAMED*(new String[] {"created", "event_time"}, Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*), Types.*SQL_TIMESTAMP *) ) .assignTimestampsAndWatermarks(WatermarkStrategy .*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10)) .withTimestampAssigner( TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp) (t2.getField(1))).getTime())) ); var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"), *$*("event_time").rowtime()); tableEnv.createTemporaryView("post_events_kafka", testDataTable); On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote: > I'm trying to write java unit test for a Flink SQL application using Flink > mini cluster, but I do not manage to create an input table with nested fields > and time characteristics. > > I had a look at the documentation and examples below, although I'm still > struggling: > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java > > > Consider for example this simple expression that I want to test and which > depends on the nested field "created.group_id" and expects "metricValue" to > be the row time: > > > var createTableDDl = "" > + " CREATE TEMPORARY VIEW postCreated10min > \n" > + " AS > \n" > + " SELECT > \n" > + " created.group_id as groupId, > \n" > + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime, > \n" > + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, > \n" > + " count(1) as metricValue > \n" > + " FROM post_events_kafka > \n" > + " GROUP BY > \n" > + " created.group_id, > \n" > + " TUMBLE(event_time, INTERVAL '10' MINUTES) > \n"; > tableEnv.executeSql(createTableDDl); > > > In a unit test, the following syntax allows me to create test input data with > nested fields, but I have not found how to specify row time nor watermarks > with this approach: > > > Table testTable = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("created", > DataTypes.ROW( > DataTypes.FIELD("group_id", DataTypes.STRING()) > ) > ), > DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) > ), > > row(row("group123"), "2021-02-03 11:36:20"), > row(row("group123"), "2021-02-03 11:38:20"), > row(row("group123"), "2021-02-03 11:40:20") > ); > tableEnv.createTemporaryView("post_events_kafka", testTable); > > > I have also tried the following syntax, which allows to specify watermark and > row time, but I have not found how to create a nested field with this > approach: > > > var testData = List.of( > Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), > sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), > Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) > ); > var testStream = streamEnv > .fromCollection(testData) > .assignTimestampsAndWatermarks(WatermarkStrategy > .>forBoundedOutOfOrderness(Duration.ofMinutes(10)) > .withTimestampAssigner( > TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) > ); > var testDataTable = tableEnv.fromDataStream( > testStream, > $("group_id"), $("true_as_of"), $("event_time").rowtime() > ); > tableEnv.createTemporaryView("post_events_kafka", testDataTable); > > > > What am I missing?
How to create a java unit test for Flink SQL with nested field and watermark?
I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics. I had a look at the documentation and examples below, although I'm still struggling: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time: var createTableDDl = "" + " CREATE TEMPORARY VIEW postCreated10min \n" + " AS \n" + " SELECT \n" + " created.group_id as groupId, \n" + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime, \n" + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, \n" + " count(1) as metricValue \n" + " FROM post_events_kafka \n" + " GROUP BY \n" + " created.group_id, \n" + " TUMBLE(event_time, INTERVAL '10' MINUTES) \n"; tableEnv.executeSql(createTableDDl); In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach: Table testTable = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("created", DataTypes.ROW( DataTypes.FIELD("group_id", DataTypes.STRING()) ) ), DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) ), row(row("group123"), "2021-02-03 11:36:20"), row(row("group123"), "2021-02-03 11:38:20"), row(row("group123"), "2021-02-03 11:40:20") ); tableEnv.createTemporaryView("post_events_kafka", testTable); I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach: var testData = List.of( Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) ); var testStream = streamEnv .fromCollection(testData) .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofMinutes(10)) .withTimestampAssigner( TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) ); var testDataTable = tableEnv.fromDataStream( testStream, $("group_id"), $("true_as_of"), $("event_time").rowtime() ); tableEnv.createTemporaryView("post_events_kafka", testDataTable); What am I missing?
Re: how to convert DataStream to Table
Hi, Here's an example that works for me: """ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*$*; import java.util.List; public class Stream2Table { public static void main(String[] args) { var streamingEnv = StreamExecutionEnvironment.*getExecutionEnvironment*(); var tableEnv = StreamTableEnvironment.*create*(streamingEnv); var userRows = streamingEnv.fromCollection( List.*of*( Row.*of*("user1", "al...@mail.org <mailto:us...@mail.org>", "Alice"), Row.*of*("user2", "b...@mail.org <mailto:us...@mail.org>", "Bob") ), new RowTypeInfo(Types.*STRING*, Types.*STRING*, Types.*STRING*)); var table = tableEnv .fromDataStream(userRows, *$*("user_id"), *$*("handle"), *$*("name")); table.execute().print(); } } """ You can also dig here, you'll probably find better examples https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table Cheers, Svend On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote: > > Hi All, > > there is a scenario where I need to process OGG Log data in kafka using Flink > Sql. I can convert the OGG Log Stream to DataStream and each event > has RowKind, but i have trouble converting DataStream to a Table. > For test, i tried StreamTableEnvironment#fromDataStream and > createTemporaryView API, both TableSchema is > ``` > root > |-- f0: LEGACY('RAW', 'ANY') > ``` > > i want to get the schema : > > ``` > root > |— column1: Type, > |— column2: Type, > …. > ``` > > > how to convert DataStream with RowKind to Table? > > > Thank you very much for your reply >
Failed to register Protobuf Kryo serialization
Hi all, I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong? I'm using a simple protobuf schema: ``` syntax = "proto3"; import "google/protobuf/wrappers.proto"; option java_multiple_files = true; message DemoUserEvent { Metadata metadata = 1; oneof payload { Created created = 10; Updated updated = 11; } message Created {...} message Updated {...} ... } ``` >From which I'm generating java from this Gradle plugin: ``` plugins { id "com.google.protobuf" version "0.8.15" } ``` And I'm generating DemoUserEvent instances with Java Iterator looking like this: ``` public class UserEventGenerator implements Iterator, Serializable { transient public final static Faker faker = new Faker(); ... @Override public DemoUserEvent next() { return randomCreatedEvent(); } ... ``` I read those two pieces of documentation: * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html And tried the demo app below: ``` import com.twitter.chill.protobuf.ProtobufSerializer; ... public static void main(String[] args) { final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print(); } ``` But the serialization mechanism still fails to handle my protobuf class: 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. I've also tried this, without success: ``` flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); ``` I'm using those versions: ``` ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' } dependencies { compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" implementation ("com.twitter:chill-protobuf:0.9.5") { exclude group: 'com.esotericsoftware.kryo', module: 'kryo' } implementation "com.google.protobuf:protobuf-java:3.14.0" implementation 'com.github.javafaker:javafaker:1.0.2' } ``` Any idea what I should try next? Thanks in advance!