Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread Svend
Hi Jonas,

Just a thought, could you try this policy? If I recall correctly, I think you 
need ListBucket on the bucket itself, whereas the other can have a path prefix 
like the "/*" you added

"
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:Get*",
"s3:Put*",
"s3:Delete*"
],
"Resource": [
"arn:aws:s3:::-flink-dev",
"arn:aws:s3:::-flink-dev/*"
],
"Effect": "Allow"
}
]
}
"

Svend


On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
> Hey Matthias,
> 
> Yes, I have followed the documentation on the link you provided - and decided 
> to go for the recommended approach of using IAM roles. 
> The hive.s3.use-instance-credentials configuration parameter I got from [1] 
> (first bullet) since I am using the flink-s3-fs-presto plugin - which says:
> 
> ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is 
> based on code from the Presto project <https://prestodb.io/>. You can 
> configure it using the same configuration keys as the Presto file system 
> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, 
> by adding the configurations to your `flink-conf.yaml`. The Presto S3 
> implementation is the recommended file system for checkpointing to S3
> 
> Its possible I am misunderstanding it?
> 
> Best,
> Jonas
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> 
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl :
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's 
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials 
>> being a valid configuration parameter. 
>> 
>> Best,
>> Matthias
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> 
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>>> Hey,
>>> 
>>> I am setting up HA on a standalone Kubernetes Flink application job 
>>> cluster. 
>>> Flink (1.12.5) is used and I am using S3 as the storage backend 
>>> 
>>> * The JobManager shortly fails after starts with the following errors 
>>> (apologies in advance for the length), and I can't understand what's going 
>>> on.
>>> * First I thought it may be due to missing Delete privileges of the IAM 
>>> role and updated that, but the problem persists. 
>>> * The S3 bucket configured s3:///recovery is empty.
>>> 
>>> configmap.yaml
>>> flink-conf.yaml: |+
>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.numberOfTaskSlots: 2
>>> taskmanager.rpc.port: 6122
>>> taskmanager.memory.process.size: 1728m
>>> blob.server.port: 6124
>>> queryable-state.proxy.ports: 6125
>>> parallelism.default: 2
>>> scheduler-mode: reactive
>>> execution.checkpointing.interval: 10s
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>> high-availability: 
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.cluster-id: {{ $fullName }}
>>> high-availability.storageDir: s3://-flink-{{ .Values.environment 
>>> }}/recovery
>>> hive.s3.use-instance-credentials: true
>>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for 
>>> running the jobmanager and taskmanager pods
>>> 
>>> role.yaml
>>> kind: Role
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> metadata:
>>> name: {{ $fullName }}
>>> namespace: {{ $fullName }}
>>> labels:
>>> app: {{ $appName }}
>>> chart: {{ template "thoros.chart" . }}
>>> release: {{ .Release.Name }}
>>> heritage: {{ .Release.Service }}
>>> 
>>> rules:
>>> - apiGroups: [""]
>>> resources: ["configmaps"]
>>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>> 
>>> aws IAM policy
>>> {
>>> "Version": "2012-10-17&

Re: Production Grade GitOps Based Kubernetes Setup

2021-08-10 Thread Svend
Hi all,

I reached out [1] to Filipe Regadas, the author of the Spotify fork of the GCP 
k8s operator you linked in your message. He confirms he's actively working on 
it and would welcome PR and community input. I have a modest PR I'll submit to 
him some time this week already.

This seems to me a promising option to look into.

Svend


[1] https://github.com/spotify/flink-on-k8s-operator/issues/82

On Mon, 9 Aug 2021, at 12:36 PM, Niklas Wilcke wrote:
> Hi Yuval,
> 
> thank you for sharing all the information. I forgot to mention the Lyft 
> operator. Thanks for "adding" it to the list.
> About the dual cluster approach during upgrade I have some doubts about the 
> resource usage. If you are operating some "big" jobs that would mean you 
> always have to provide enough resources to run two of them in parallel during 
> the upgrade or is there some workaround (downscaling, upscaling) available?
> I will further investigate how the option three 
> "GoogleCloudPlatform/flink-on-k8s-operator" is implementing the upgrade 
> process.
> 
> I agree that there is a need for a community based operator project. It is 
> unfortunate that both "relevant" projects (Lyft, GCP) have been more or less 
> abandoned. The only thing that is left I can see with some activity is a fork 
> of the GCP operator from spotify [0], but there is only one person involved.
> 
> Regards,
> Niklas
> 
> [0] https://github.com/spotify/flink-on-k8s-operator
> 
> 
> UNIBERG GmbH 
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
> 
> niklas.wil...@uniberg.com
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
> 
> 
> UNIBERG GmbH, Dorfstraße 3, 23816 Bebensee 
> 
> Registergericht / Register: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer / CEO‘s: Andreas Möller, Martin Ulbricht
> 
> Informationen zum Datenschutz / Privacy Information: 
> https://www.uniberg.com/impressum.html
> 
>> On 6. Aug 2021, at 16:59, Yuval Itzchakov  wrote:
>> 
>> Hi Niklas,
>> 
>> We are currently using the Lyft operator for Flink in production 
>> (https://github.com/lyft/flinkk8soperator), which is additional alternative. 
>> The project itself is pretty much in Zombie state, but commits happen every 
>> now and then.
>> 
>> 1. Native Kubernetes could definitely work with GitOps, it would just 
>> require you to do lots of steps "by hand" in terms of application upgrade 
>> and rollover.
>> 2. We're using Lyfts operator as mentioned above. It mostly works well, 
>> there were several issues we had along the way but were mostly resolved. One 
>> feature that is missing for us specifically is being able to perform an 
>> upgrade by first savepointing and killing the existing cluster and only then 
>> deploying a new one (their approach is dual, meaning have two clusters up 
>> and running before doing the rollover).
>> 3. At it's current state it looks more like a side project than an actively 
>> maintained operator.
>> 4. Ververica is definitely an option, we haven't tested their operator, not 
>> sure about the maturity level yet.
>> 
>> I think a Flink community based operator for k8s is a much needed project 
>> (which I'd be happy to contribute to).
>> 
>> 
>> 
>> 
>> On Fri, Aug 6, 2021, 14:49 Niklas Wilcke  wrote:
>>> Hi Flink Community,
>>> 
>>> I'm currently assessing the situation about how to properly deploy Flink on 
>>> Kubernetes via GitOps. There are some options available to deploy Flink on 
>>> Kubernetes, which I would like to discuss.  In general we are looking for 
>>> an open source or at least unpaid solution, but I don't exclude paid 
>>> solutions from the beginning.
>>> I see the following options.
>>> 
>>> 1. Kubernetes Standalone [1]
>>> * Seems to be deprecated, since the docs state to use Native Kubernetes 
>>> instead
>>> 2. Native Kubernetes [2]
>>> * Doesn't seem to implement the Kubernetes operator pattern
>>> * Seems to require command line activities to be operated / upgraded (not 
>>> GitOps compatible out of the box)
>>> 3. "GoogleCloudPlatform/flink-on-k8s-operator" Operator [3]
>>> * Seems not to be well maintained / documented
>>> * We had some trouble with crashes during configuration changes, but we 
>>> need to investigate further
>>> * There is a "maintained" fork from spotify, which could be an option
>>> 4. Flink Native Kubernetes Operator [4]
>>> * Seems to be a private project from a Flink Committer, which might not be 
>

Re: Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"

2021-08-04 Thread Svend

Hi again,

After a bit of experimentation (and actually reading the bug report I linked), 
I realized the issue was that the parallelism was higher than the number of 
Kafka partitions => reducing the parallelism enabled the checkpoints to work as 
expected.

=> since it seems unsupported, maybe KafkaSource should refuse to even start if 
its configured parallelism is higher than the kafka partitioning ? Otherwise, 
this error condition is rather difficult to interpret IMHO. I'm happy to open a 
jira and work on that if that's desired?

Best regards,

Svend



On Wed, 4 Aug 2021, at 11:32 AM, Svend wrote:
> Hi Robert, 
> 
> Thanks for the feed-back.
> 
> You are correct, the behavior is indeed different: when I make the source 
> bounded, the application eventually stops whereas without that setting it 
> runs forever.
> 
> In both cases neither checkpoints nor data is being written to the filesystem.
> 
> I re-ran the experiment to get more info:
> 
> * when making the kafka source unbounded (which is not what I want), I notice 
> log telling tasks associated to the Kafka source gets unregistered in the 
> beginning:
> 
> 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source 
> -> Sink: Unnamed (3/8)#0 – Freeing task resources for Source: Kafka Source 
> -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d).
> 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor 
> flink-akka.actor.default-dispatcher-6 – Un-registering task and sending 
> final execution state FINISHED to JobManager for task Source: Kafka Source -> 
> Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559.
> 
> 
> Later on, it seems each checkpoint fails while complaining that some tasks 
> are not running => would that be caused by the finished tasks above?
> 
> 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator 
> Checkpoint Timer – Failed to trigger checkpoint for job 
> 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 
> 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint 
> Failure reason: Not all required tasks are currently running.
> 
> 
> * when setting the kafka source back to bounded, I also notice such 
> "unregistered" log message, before one "Checkpoint Timer – Failed to 
> trigger c" log, and then the job finishes
> 
> 
> By digging a little this seems similar to this bug: 
> https://issues.apache.org/jira/browse/FLINK-2491
> 
> I've tried to add a supplementary ProcessFunction in the pipeline just to 
> have one more stateful thing and hope to trigger the checkpoint, though 
> without success. 
> 
> 
> 
> 
> On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote:
>> Hi Svend,
>> I'm a bit confused by this statement:
>> 
>>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>>> kafka source yields the same result
>> 
>> My expectation would be that the source runs forever, if it is not bounded.
>> Are you sure this error message is not coming from another task?
>> 
>> 
>> On Sat, Jul 31, 2021 at 11:23 AM Svend  wrote:
>>> __
>>> Hi everyone,
>>> 
>>> I'm failing to write a simple Flink 1.13.1 application with the DataStream 
>>> that reads from kafka and writes to parquet.
>>> 
>>> My intention is to run this as a job that reads what is currenlty in kafka, 
>>> shuts down when reaching current end of each partition and picks up from 
>>> there next time it's started.
>>> 
>>> I've tried several variations, I can't get anything to work properly:
>>> 
>>> * In streaming mode, enabling checkpoint and setting the kafkaSource to 
>>> bounded (see code sample below), the application fails to perform 
>>> checkpoint complaining about:
>>> 
>>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks 
>>> of job ... has been finished"
>>> 
>>> => no parquet part gets written, no checkpoint gets written and no kafka 
>>> offset get committed
>>>  
>>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>>> kafka source yields the same result
>>> 
>>> * I also tried in batch mode, removing the checkpoint, switching the 
>>> StreamingFileSink for a FileSink and using Kafka's 
>>> "auto.commit.interval.ms" => in that case I'm getting some parquet output 
>>> and kafka offsets are committed, but the application shuts down before 
>>> flushing the offset of what has been read, s.t. the latest kafka events 
>>> will be

Re: Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"

2021-08-04 Thread Svend
Hi Robert, 

Thanks for the feed-back.

You are correct, the behavior is indeed different: when I make the source 
bounded, the application eventually stops whereas without that setting it runs 
forever.

In both cases neither checkpoints nor data is being written to the filesystem.

I re-ran the experiment to get more info:

* when making the kafka source unbounded (which is not what I want), I notice 
log telling tasks associated to the Kafka source gets unregistered in the 
beginning:

2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source -> 
Sink: Unnamed (3/8)#0 – Freeing task resources for Source: Kafka Source -> 
Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d).
2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor 
flink-akka.actor.default-dispatcher-6 – Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Kafka Source -> Sink: 
Unnamed (8/8)#0 f02843c05005daced3d5271832f25559.

Later on, it seems each checkpoint fails while complaining that some tasks are 
not running => would that be caused by the finished tasks above?

2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator 
Checkpoint Timer – Failed to trigger checkpoint for job 
321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 
321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint 
Failure reason: Not all required tasks are currently running.


* when setting the kafka source back to bounded, I also notice such 
"unregistered" log message, before one "Checkpoint Timer – Failed to trigger 
c" log, and then the job finishes


By digging a little this seems similar to this bug: 
https://issues.apache.org/jira/browse/FLINK-2491

I've tried to add a supplementary ProcessFunction in the pipeline just to have 
one more stateful thing and hope to trigger the checkpoint, though without 
success. 




On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote:
> Hi Svend,
> I'm a bit confused by this statement:
> 
>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>> kafka source yields the same result
> 
> My expectation would be that the source runs forever, if it is not bounded.
> Are you sure this error message is not coming from another task?
> 
> 
> On Sat, Jul 31, 2021 at 11:23 AM Svend  wrote:
>> __
>> Hi everyone,
>> 
>> I'm failing to write a simple Flink 1.13.1 application with the DataStream 
>> that reads from kafka and writes to parquet.
>> 
>> My intention is to run this as a job that reads what is currenlty in kafka, 
>> shuts down when reaching current end of each partition and picks up from 
>> there next time it's started.
>> 
>> I've tried several variations, I can't get anything to work properly:
>> 
>> * In streaming mode, enabling checkpoint and setting the kafkaSource to 
>> bounded (see code sample below), the application fails to perform checkpoint 
>> complaining about:
>> 
>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks 
>> of job ... has been finished"
>> 
>> => no parquet part gets written, no checkpoint gets written and no kafka 
>> offset get committed
>>  
>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>> kafka source yields the same result
>> 
>> * I also tried in batch mode, removing the checkpoint, switching the 
>> StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" 
>> => in that case I'm getting some parquet output and kafka offsets are 
>> committed, but the application shuts down before flushing the offset of what 
>> has been read, s.t. the latest kafka events will be read again at the next 
>> start.
>> 
>> This all sounds very basic, I see other people do this kind of thing 
>> (recently, [1]), and II was really expecting the combinaision of KafkaSource 
>> with StreamingFileSink and checkpointing to work, all those are streaming 
>> concepts. Hopefully I'm doing something wrong?
>> 
>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser
>> 
>> Thanks a lot in advance,
>> 
>> Svend
>> 
>> ```
>> // I'm testing this by launching the app an IDE
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new 
>> DynamicProtobufKryoSerializer(params));
>> 
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>> env.enableCheckpointing(1000);
>> 
>> env.getCheckpointConfig().setCheck

Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"

2021-07-31 Thread Svend
Hi everyone,

I'm failing to write a simple Flink 1.13.1 application with the DataStream that 
reads from kafka and writes to parquet.

My intention is to run this as a job that reads what is currenlty in kafka, 
shuts down when reaching current end of each partition and picks up from there 
next time it's started.

I've tried several variations, I can't get anything to work properly:

* In streaming mode, enabling checkpoint and setting the kafkaSource to bounded 
(see code sample below), the application fails to perform checkpoint 
complaining about:

"Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks of 
job ... has been finished"

=> no parquet part gets written, no checkpoint gets written and no kafka offset 
get committed
 
* In sreaming mode, with checkpoing but removing the `setBounded()` on the 
kafka source yields the same result

* I also tried in batch mode, removing the checkpoint, switching the 
StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" => 
in that case I'm getting some parquet output and kafka offsets are committed, 
but the application shuts down before flushing the offset of what has been 
read, s.t. the latest kafka events will be read again at the next start.

This all sounds very basic, I see other people do this kind of thing (recently, 
[1]), and II was really expecting the combinaision of KafkaSource with 
StreamingFileSink and checkpointing to work, all those are streaming concepts. 
Hopefully I'm doing something wrong?

[1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser

Thanks a lot in advance,

Svend

```
// I'm testing this by launching the app an IDE

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new 
DynamicProtobufKryoSerializer(params));

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(1000);

env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

KafkaSource kafkaSource = 
KafkaSource.builder()
.setBootstrapServers("localhost:9092")
.setTopics("some-topic")
.setGroupId("my-group")
.setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setBounded(OffsetsInitializer.latest())
.build();

StreamingFileSink parquetSink = StreamingFileSink
.forBulkFormat(
Path.fromLocalFile(new File("/tmp/job-output/some-topic.parquet")),
new ParquetWriterFactory<>((out) -> new 
ParquetDynamicMessageWriterBuilder(out, params).build()))
   .withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka 
Source")
.sinkTo(parquetSink);

env.execute("my-job");
```

Re: How to configure column width in Flink SQL client?

2021-06-08 Thread Svend
Thanks for the feed-back Ingo,

Do you think a PR would be welcome to make that parameter configurable? At the 
place where I work, UUID are often used as column values and they are 36 
character longs => very often a very useful piece of information to us is not 
readable.

I had a quick look, the max width seems to be defined in [1], and used in 
various places like [2] and [3]. Should I open a Jira to discuss this and cc 
you in it?

Cheers,

Svend


[1] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74
[2] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L102
[3] 
https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java#L143


On Tue, 8 Jun 2021, at 7:34 AM, Ingo Bürk wrote:
> Hi Svend,
> 
> unfortunately the column width in the SQL client cannot currently be 
> configured.
> 
> 
> Regards
> Ingo
> 
> On Mon, Jun 7, 2021 at 4:19 PM Svend  wrote:
>> __
>> 
>> Hi everyone,
>> 
>> When using the Flink SQL client and displaying results interactively, it 
>> seems the values of any column wider than 24 characters is truncated, which 
>> is indicated by a '~' character, e.g. the "member_user_id" below:
>> 
>> ```
>> SELECT
>>   metadata.true_as_of_timestamp_millis,
>>   member_user_id,
>>   membership_updated.new_status.updated_value
>> FROM fandom_members_events
>> WHERE
>>group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'
>> 
>> 
>> true_as_of_timestamp_mil~member_user_id updated_value
>>  1622811665919 45ca821f-c0fc-4114-bef8-~
>> (NULL)
>>  1622811665919 45ca821f-c0fc-4114-bef8-~
>> JOINED
>>  1622118951005 b4734391-d3e1-417c-ad92-~
>> (NULL)
>> ...
>> ```
>> 
>> Is there a way to configure the displayed width? I didn't find any parameter 
>> for this in 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options
>> 
>> 
>> Thanks a lot in advance!
>> 
>> Svend


How to configure column width in Flink SQL client?

2021-06-07 Thread Svend

Hi everyone,

When using the Flink SQL client and displaying results interactively, it seems 
the values of any column wider than 24 characters is truncated, which is 
indicated by a '~' character, e.g. the "member_user_id" below:

```
SELECT
  metadata.true_as_of_timestamp_millis,
  member_user_id,
  membership_updated.new_status.updated_value
FROM fandom_members_events
WHERE
   group_id = '91170c98-2cc5-4935-9ea6-12b72d32fb3c'


true_as_of_timestamp_mil~member_user_id updated_value
 1622811665919 45ca821f-c0fc-4114-bef8-~(NULL)
 1622811665919 45ca821f-c0fc-4114-bef8-~JOINED
 1622118951005 b4734391-d3e1-417c-ad92-~(NULL)
...
```

Is there a way to configure the displayed width? I didn't find any parameter 
for this in 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-startup-options


Thanks a lot in advance!

Svend

Re: S3 + Parquet credentials issue

2021-05-31 Thread Svend
Hi Angelo,

I've not worked exactly in the situation you described, although I've had to 
configure S3 access from a Flink application recently and here are a couple of 
things I learnt along the way:

* You should normally not need to include flink-s3-fs-hadoop nor 
hadoop-mapreduce-client-core in your classpath but should rather make 
flink-s3-fs-hadoop available to Flink by putting it into the plugins folder. 
The motivation for that is that this jar is a fat jar containing a lot of 
hadoop and aws classes, s.t. including it in your classpath quickly leads to 
conflicts. The plugins folder is associated with a separate classpath, with 
helps avoiding those conflicts.

* Under the hood, Fink is using the hadoop-aws library to connect to s3 => the 
documentation regarding how to configure it, and especially security accesses, 
is available in [1]

* Ideally, when running on AWS, your code should not be using 
BasicAWSCredentialsProvider, but instead the application should assume a role, 
which you associate with some IAM permission.  If that's your case, the 
specific documentation for that situation is in [2]. If you're running some 
test locally on your laptop, BasicAWSCredentialsProvider with some key id and 
secret pointing to a dev account may be appropriate.

* As I understand it, any configuration entry in flink.yaml that starts with 
"fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in [3]) => by 
reading documentation in [1] and [2] you might be able to figure out which 
parameters are relevant to your case, which you can then set with the mechanism 
just mentioned. For example, in my case, I simply add this to flink.yaml:

fs.s3a.aws.credentials.provider: 
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider"

* You can debug the various operations that are attempted on S3 by setting this 
logger to DEBUG level:  org.apache.hadoop.fs.s3a


Good luck :) 

Svend



[1] 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
[2] 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
[3] 
https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-


On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
> Hello,
> 
> Trying to read a parquet file located in S3 leads to a AWS credentials 
> exception. Switching to other format (raw, for example) works ok regarding to 
> file access.
> 
> This is a snippet of code to reproduce the issue:
> 
> static void parquetS3Error() {
> 
> EnvironmentSettings settings = 
> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
> 
> TableEnvironment t_env = TableEnvironment.*create*(settings);
> 
> // parquet format gives error:
> // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
> // No AWS Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> // com.amazonaws.SdkClientException: Failed to connect to service 
> endpoint:
> t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
> 'parquet')");
> 
> // other formats (i.e. raw) work properly:
> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
> //++
> //|url |
> //++
> //| [80, 65, 82, 49, 21, 0, 21,... |
> //| [0, 0, 0, 50, 48, 50, 49, 4... |
> t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 'connector' 
> = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
> 
> Table t1 = t_env.from("backup");
> 
> t1.execute().print();
> 
> }
> Flink version is 1.12.2.
> 
> Please find attached the pom with dependencies and version numbers.
> 
> What would be a suitable workaround for this?
> 
> Thank you very much.
> 
> Angelo.
> 
> 
>  
> 
> *Attachments:*
>  * pom.xml


Re: Flink in k8s operators list

2021-05-31 Thread Svend
Hi Ilya,

Thanks for the kind feed-back.

We hit the first issue you mention related to K8s 1.18+, we then updated the 
controller-gen version to 0.2.4 in the makefile as described in the ticket you 
linked, and then ran "make deploy", which worked around the issue for us.

I'm not aware of the 2nd issue you refer to related to in-progress job? In case 
that helps, we access the Flink-UI by simply opening a port-forward on port 
8081 on the job manager, which among other things shows the currently running 
jobs.

Svend


On Mon, 31 May 2021, at 12:00 PM, Ilya Karpov wrote:
> Hi Svend,
> 
> thank you so much to sharing your experience! GCP k8s operator looks 
> promising (currently i’m trying to build it and run helm chart. An issue 
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/266> 
> with k8s version 1.18+ is road block right now, but I see that there is a 
> solution), and also seems like flink team also refers 
> <http://disq.us/p/2f7goq6> to it this implementation.
> 
> In your setup did you solve the problem of visualising list of in-progress 
> jobs?
> 
> > One worrying point though is that the maintainers of the repo seem to have 
> > become silent in March this year.
> lyfts implementation <https://github.com/lyft/flinkk8soperator> (haven’t 
> tried it yet) seems to be even more abandoned (last release 20/04/2020).
> 
>> 29 мая 2021 г., в 11:23, Svend  написал(а):
>> 
>> Hi Ilya,
>> 
>> At my company we're currently using the GCP k8s operator (2nd on your list). 
>> Our usage is very moderate, but so far it works great for us.
>> 
>> We appreciate that when upgrading the application, it triggers automatically 
>> a savepoint during shutdown and resumes from it when restarting. It also 
>> allows to take savepoints at regular intervals (we take one per day 
>> currently).
>> 
>> We're using it with Flink 1.12.4 and AWS EKS.
>> 
>> Getting the Flink metrics and logs exported to our monitoring system worked 
>> out of the box. 
>> 
>> Configuring IAM roles and K8s service account for saving checkpoints and 
>> savepoints to S3 required a bit more fiddling although we got it working. 
>> 
>> Happy to share code snippet about any of that if that's useful :)
>> 
>> It was last updated with Flink 1.11 in mind, so there is currently no 
>> built-in support for the reactive scaling mode recently added in Flink 1.13.
>> 
>> One worrying point though is that the maintainers of the repo seem to have 
>> become silent in March this year. There is a small and active community 
>> around it though and issues and PRs keep on arriving and are waiting for 
>> feed-back. It's all free and OSS, so who are we to complain? Though it's 
>> still an important attention point.
>> 
>> Hope this helps,
>> 
>> Svend
>> 
>> 
>> 
>> 
>> 
>> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
>>> Hi there,
>>> 
>>> I’m making a little research about the easiest way to deploy link job to 
>>> k8s cluster and manage its lifecycle by *k8s operator*. The list of 
>>> solutions is below:
>>> - https://github.com/fintechstudios/ververica-platform-k8s-operator
>>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
>>> - https://kudo.dev/docs/examples/apache-flink.html
>>> - https://github.com/wangyang0918/flink-native-k8s-operator
>>> 
>>> If you are using smth that is not listed above please share! Any share 
>>> about how specific solution works is greatly appreciated.
>>> 
>>> Thanks in advance


Re: Idle source configuration per topic with the Kafka Table API connector

2021-05-31 Thread Svend
Awesome,  thanks a lot for clarifications Jing Zhang, it's very useful.

Best,

Svend

On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote:
> Hi Svend,
> Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those 
> version provides many related improvements.
> 
> > as per [1]
> Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a 
> global parameter, It would apply to all those table sources which with 
> watermark  clause but not use SOURCE WATERMARK
> > as per [2]
> Yes.
> > If that is correct, I guess I can simply use the DataStream connector for 
> > that specific topic and then convert it to a Table.
> Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like 
> the following demo:
> Table table =
> tableEnv.fromDataStream(
> dataStream,
> Schema.*newBuilder*()
> . // other logical
> .watermark("columnName", "SOURCE_WATERMARK()")
> .build());
> I would like to invite Jark And Timo to double check, they are more familiar 
> with the issue.
> 
> Best,
> JING ZHANG
> 
> 
> Svend  于2021年5月29日周六 下午3:34写道:
>> __
>> Hi everyone,
>> 
>> My Flink streaming application consumes several Kafka topics, one of which 
>> receiving traffic in burst once per day.
>> 
>> I would like that topic not to hold back the progress of the watermark.
>> 
>> Most of my code is currently using the SQL API and in particular the Table 
>> API Kafka connector.
>> 
>> I have read about the idle source configuration mechanism, could you please 
>> confirm my understanding that:
>> 
>> * as per [1]: when I'm using the Table API Kafka connector, we currently do 
>> not have the possibility to specify the idle source parameter specifically 
>> for each topic, although we can set it globally on the 
>> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter
>> 
>> * as per [2]: when using the DataStream Kafka connector, we can set the idle 
>> source parameter specifically for each topic by specifying ".withIdleness()" 
>> to the WatermarkStrategy.
>> 
>> If that is correct, I guess I can simply use the DataStream connector for 
>> that specific topic and then convert it to a Table.
>> 
>> Thanks a lot!
>> 
>> Svend
>> 
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
>> 
>> 
>> 


Re: Flink in k8s operators list

2021-05-29 Thread Svend
Hi Ilya,

At my company we're currently using the GCP k8s operator (2nd on your list). 
Our usage is very moderate, but so far it works great for us.

We appreciate that when upgrading the application, it triggers automatically a 
savepoint during shutdown and resumes from it when restarting. It also allows 
to take savepoints at regular intervals (we take one per day currently).

We're using it with Flink 1.12.4 and AWS EKS.

Getting the Flink metrics and logs exported to our monitoring system worked out 
of the box. 

Configuring IAM roles and K8s service account for saving checkpoints and 
savepoints to S3 required a bit more fiddling although we got it working. 

Happy to share code snippet about any of that if that's useful :)

It was last updated with Flink 1.11 in mind, so there is currently no built-in 
support for the reactive scaling mode recently added in Flink 1.13.

One worrying point though is that the maintainers of the repo seem to have 
become silent in March this year. There is a small and active community around 
it though and issues and PRs keep on arriving and are waiting for feed-back. 
It's all free and OSS, so who are we to complain? Though it's still an 
important attention point.

Hope this helps,

Svend





On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
> Hi there,
> 
> I’m making a little research about the easiest way to deploy link job to k8s 
> cluster and manage its lifecycle by *k8s operator*. The list of solutions is 
> below:
> - https://github.com/fintechstudios/ververica-platform-k8s-operator
> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> - https://kudo.dev/docs/examples/apache-flink.html
> - https://github.com/wangyang0918/flink-native-k8s-operator
> 
> If you are using smth that is not listed above please share! Any share about 
> how specific solution works is greatly appreciated.
> 
> Thanks in advance


Idle source configuration per topic with the Kafka Table API connector

2021-05-29 Thread Svend
Hi everyone,

My Flink streaming application consumes several Kafka topics, one of which 
receiving traffic in burst once per day.

I would like that topic not to hold back the progress of the watermark.

Most of my code is currently using the SQL API and in particular the Table API 
Kafka connector.

I have read about the idle source configuration mechanism, could you please 
confirm my understanding that:

* as per [1]: when I'm using the Table API Kafka connector, we currently do not 
have the possibility to specify the idle source parameter specifically for each 
topic, although we can set it globally on the StreamTableEnvironment with the 
"table.exec.source.idle-timeout" parameter

* as per [2]: when using the DataStream Kafka connector, we can set the idle 
source parameter specifically for each topic by specifying ".withIdleness()" to 
the WatermarkStrategy.

If that is correct, I guess I can simply use the DataStream connector for that 
specific topic and then convert it to a Table.

Thanks a lot!

Svend



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission




How to process recent events from Kafka and older ones from another storage?

2021-05-10 Thread Svend
Hi everyone,

What is the typical architectural approach with Flink SQL for processing recent 
events from Kafka and older events from some separate cheaper storage?

I currently have the following situation in mind:

* events are appearing in Kafka and retained there for, say, 1 month
* events are also regularly forwarded to AWS S3 into Parquet files

Given a use case, or a bugfix, that requires to (re)-process events older than 
1 month, how do we design a Flink SQL pipeline that produces correct results?

* one possibility I envisaged is simply to first start the Flink SQL 
application using a FileSystem connector to read the events from parquet, then 
to shut it down while triggering a savepoint and finally resume it from that 
savepoint while now using the Kafka connector. I saw some presentations where 
engineers from Lyft were discussing that approach. For what I understand 
though, the FileSystem connector currently does not emit watermarks (I 
understand https://issues.apache.org/jira/browse/FLINK-21871 is addressing just 
that), so if I get things correctly that would imply that none of my Flink SQL 
code can depend on event time, which seems very restrictive.

* another option is to use infinite retention in Kafka, which is expensive, or 
to copy old events from S3 back to Kafka when we need to process them.

Since streaming in Flink, data retention in Kafka and pipeline backfilling are 
such common concepts, I am imagining that many teams are addressing the 
situation I'm describing above already.

What is the usual way of approaching this?

Thanks a lot in advance






Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-30 Thread Svend
Thanks for the feedback. 

The CSV is a good idea and will make my tests more readable, I'll use that. 

Looking forward to Flink 1.13 !

Svend 




On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
> Hi,
> 
> there are multiple ways to create a table for testing:
> 
> - use the datagen connector
> - use the filesystem connector with CSV data
> - and beginning from Flink 1.13 your code snippets becomes much simpler
> 
> Regards,
> Timo
> 
> On 29.04.21 20:35, Svend wrote:
> > I found an answer to my own question!
> > 
> > For future reference, the snipet below allows to create a SQL table with 
> > a nested field and a watermark and filled with hard-coded values, which 
> > is all I need in order to test SQL expressions.
> > 
> > It's quite a mouthful though, is there a more succint to express the 
> > same thing?
> > 
> > 
> > var testData = List./of/(
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> > );
> > var testStream = streamEnv
> > .fromCollection(testData,
> > Types./ROW_NAMED/(new String[] {"created", "event_time"},
> > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> > Types./SQL_TIMESTAMP
> > /)
> > )
> > .assignTimestampsAndWatermarks(WatermarkStrategy
> > ./forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> > .withTimestampAssigner(
> > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) 
> > (t2.getField(1))).getTime()))
> > );
> > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), 
> > /$/("event_time").rowtime());
> > tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> > 
> > 
> > 
> > 
> > 
> > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> >> I'm trying to write java unit test for a Flink SQL application using 
> >> Flink mini cluster, but I do not manage to create an input table with 
> >> nested fields and time characteristics.
> >>
> >> I had a look at the documentation and examples below, although I'm 
> >> still struggling:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> >>  
> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html>
> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> >>  
> >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>
> >>
> >>
> >> Consider for example this simple expression that I want to test and 
> >> which depends on the nested field "created.group_id" and expects 
> >> "metricValue" to be the row time:
> >>
> >>
> >> var createTableDDl = ""
> >> + " CREATE TEMPORARY VIEW 
> >> postCreated10min  \n"
> >> + " 
> >> AS 
> >>  
> >> \n"
> >> + " 
> >> SELECT 
> >>  
> >> \n"
> >> + "   created.group_id as 
> >> groupId,  \n"
> >> + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as 
> >> metricTime,  \n"
> >> + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as 
> >> rowTime, \n"
> >> + "   count(1) as 
> >> metricValue   \n"
> >> + " FROM 
> >> post_events_kafka  \n"
> >> + " GROUP 
> >> BY\n"
> >> + "   
> >> created.group_id, 
> >> \n"
> >> + "   TUMBLE(event_time, INTERVAL '10' 
> >> MINUTES)

Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-29 Thread Svend
I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with a 
nested field and a watermark and filled with hard-coded values, which is all I 
need in order to test SQL expressions.

It's quite a mouthful though, is there a more succint to express the same thing?


var testData = List.*of*(
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types.*ROW_NAMED*(new String[] {"created", "event_time"},
Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*),
Types.*SQL_TIMESTAMP
*)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10))
.withTimestampAssigner(
TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp) 
(t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"), 
*$*("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);




On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> I'm trying to write java unit test for a Flink SQL application using Flink 
> mini cluster, but I do not manage to create an input table with nested fields 
> and time characteristics.
> 
> I had a look at the documentation and examples below, although I'm still 
> struggling:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> 
> 
> Consider for example this simple expression that I want to test and which 
> depends on the nested field "created.group_id" and expects "metricValue" to 
> be the row time:
> 
> 
> var createTableDDl = ""
> + " CREATE TEMPORARY VIEW postCreated10min
>   \n"
> + " AS
>   \n"
> + " SELECT
>   \n"
> + "   created.group_id as groupId,
>   \n"
> + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,
>   \n"
> + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,   
>   \n"
> + "   count(1) as metricValue 
>   \n"
> + " FROM post_events_kafka
>   \n"
> + " GROUP BY  
>   \n"
> + "   created.group_id,   
>   \n"
> + "   TUMBLE(event_time, INTERVAL '10' MINUTES)   
>   \n";
> tableEnv.executeSql(createTableDDl);
> 
> 
> In a unit test, the following syntax allows me to create test input data with 
> nested fields, but I have not found how to specify row time nor watermarks 
> with this approach:
> 
> 
> Table testTable = tableEnv.fromValues(
>   DataTypes.ROW(
> DataTypes.FIELD("created",
>   DataTypes.ROW(
> DataTypes.FIELD("group_id", DataTypes.STRING())
>   )
> ),
> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
>   ),
>  
>   row(row("group123"), "2021-02-03 11:36:20"),
>   row(row("group123"), "2021-02-03 11:38:20"),
>   row(row("group123"), "2021-02-03 11:40:20")
> );
> tableEnv.createTemporaryView("post_events_kafka", testTable);
> 
> 
> I have also tried the following syntax, which allows to specify watermark and 
> row time, but I have not found how to create a nested field with this 
> approach:
> 
> 
> var testData = List.of(
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
>   sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
>   Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
>   .fromCollection(testData)
>   .assignTimestampsAndWatermarks(WatermarkStrategy
>   .>forBoundedOutOfOrderness(Duration.ofMinutes(10))
>   .withTimestampAssigner(
> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
>   );
> var testDataTable = tableEnv.fromDataStream(
>   testStream,
>   $("group_id"), $("true_as_of"), $("event_time").rowtime()
> );
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> 
> 
> 
> What am I missing?


How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-29 Thread Svend
I'm trying to write java unit test for a Flink SQL application using Flink mini 
cluster, but I do not manage to create an input table with nested fields and 
time characteristics.

I had a look at the documentation and examples below, although I'm still 
struggling:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java


Consider for example this simple expression that I want to test and which 
depends on the nested field "created.group_id" and expects "metricValue" to be 
the row time:


var createTableDDl = ""
+ " CREATE TEMPORARY VIEW postCreated10min  
\n"
+ " AS  
\n"
+ " SELECT  
\n"
+ "   created.group_id as groupId,  
\n"
+ "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,  
\n"
+ "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, 
\n"
+ "   count(1) as metricValue   
\n"
+ " FROM post_events_kafka  
\n"
+ " GROUP BY
\n"
+ "   created.group_id, 
\n"
+ "   TUMBLE(event_time, INTERVAL '10' MINUTES) 
\n";
tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input data with 
nested fields, but I have not found how to specify row time nor watermarks with 
this approach:


Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
DataTypes.FIELD("created",
  DataTypes.ROW(
DataTypes.FIELD("group_id", DataTypes.STRING())
  )
),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),
 
  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify watermark and 
row time, but I have not found how to create a nested field with this approach:


var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner(
TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am I missing?

Re: how to convert DataStream to Table

2021-04-12 Thread Svend
Hi,

Here's an example that works for me:


"""
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*$*;

import java.util.List;

public class Stream2Table {

public static void main(String[] args) {

var streamingEnv = 
StreamExecutionEnvironment.*getExecutionEnvironment*();
var tableEnv = StreamTableEnvironment.*create*(streamingEnv);

var userRows = streamingEnv.fromCollection(
List.*of*(
Row.*of*("user1", "al...@mail.org 
<mailto:us...@mail.org>", "Alice"),
Row.*of*("user2", "b...@mail.org 
<mailto:us...@mail.org>", "Bob")
),
new RowTypeInfo(Types.*STRING*, Types.*STRING*, 
Types.*STRING*));

var table = tableEnv
.fromDataStream(userRows,
*$*("user_id"), *$*("handle"), *$*("name"));

table.execute().print();
}

}
"""

You can also dig here, you'll probably find better examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table

Cheers,

Svend


On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:
> 
> Hi All,

> 

> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream and each event 
> has RowKind, but i have trouble converting DataStream to a Table.

> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is 

> ```

> root

>  |-- f0: LEGACY('RAW', 'ANY')

> ```

> 

> i want to get the schema :

> 

> ```

> root 

>  |— column1: Type,

>  |— column2: Type, 

> ….

> ```

> 

> 

> how to convert DataStream with RowKind to Table? 

> 

> 

> Thank you very much for your reply

> 


Failed to register Protobuf Kryo serialization

2021-02-14 Thread Svend Vanderveken
Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could
you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:

```

syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;

message DemoUserEvent {
  Metadata metadata = 1;
  oneof payload {
Created created = 10;
Updated updated = 11;
  }

  message Created {...}

  message Updated {...}

  ...

}

```


>From which I'm generating java from this Gradle plugin:


```

plugins {
id "com.google.protobuf" version "0.8.15"
}

```


And I'm generating DemoUserEvent instances with Java Iterator looking like this:


```

public class UserEventGenerator implements Iterator,
Serializable {
transient public final static Faker faker = new Faker();
...
@Override public DemoUserEvent next() {
return randomCreatedEvent();

 }

 ...

```


I read those two pieces of documentation:
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

And tried the demo app below:

```

import com.twitter.chill.protobuf.ProtobufSerializer;

...

public static void main(String[] args) {
final StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);
flinkEnv.fromCollection(new UserEventGenerator(),
DemoUserEvent.class).print();
}

```

But the serialization mechanism still fails to handle my protobuf class:

11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 [] - Class class live.schema.event.user.v1.DemoUserEvent
cannot be used as a POJO type because not all fields are valid POJO
fields, and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the
effect on performance.

I've also tried this, without success:

```

flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);

```


I'm using those versions:

```

ext {
javaVersion = '11'
flinkVersion = '1.12.1'
scalaBinaryVersion = '2.12'
}

dependencies {
compileOnly
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation ("com.twitter:chill-protobuf:0.9.5") {
exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation "com.google.protobuf:protobuf-java:3.14.0"
implementation 'com.github.javafaker:javafaker:1.0.2'
}

```


Any idea what I should try next?

Thanks in advance!