Setting producerConfig properties for FlinkKafkaProducer

2021-11-19 Thread Darius Kasad
I've overridden v1.11.3 FlinkKafkaConsumer's 'open' method in order to set
TLS configuration for kafka from the task manager node where the kafka
consumer is running (TLS configuration differs between job manager and each
task manager in our environment, which is why we use 'open' vs. setting
configuration on construct).

However, when I do the same for FlinkKafkaProducer, I've noticed that
'initializeState', which creates a kafka producer, is called prior to
calling 'open'. This causes an exception since a kafka producer is created
with the wrong configuration (i.e. the config from the job manager where
FlinkKafkaProducer was constructed).

What is the appropriate way to set up configuration for a
FlinkKafkaProducer, allowing me to read config from the taskManager node
where the producer is executing? I can override both 'open' and
'initializeState' to set up config; this solution works, but is there a
better alternative (e.g. 'createProducer', etc.)? What about v1.4.x
KafkaSource/KafkaSink?

Thanks,
Darius


Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-19 Thread Kamil ty
Hello all,

I'm working on a pyflink job that's supposed to consume json messages from
Kafka and save them to a partitioned avro file sink.
I'm having difficulties finding a solution on how to process the
messages, because there is only one kafka topic for multiple
message schemas. As pyflinks FlinkKafkaConsumer expects a
JsonRowDeserialization schema, I assume that all of the messages need a
constant defined schema. I expect the same for the Kafka Table API.

The messages follow a general debezium message schema:
Example data taken from flink docs:

{
  "schema": {...},
  "payload": {
"before": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.18
},
"after": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
  }}

The messages are coming to a single Kafka topic, where the 'schema',
'after', 'before' fields can be different for each message. The kafka
message key also contains the 'schema' field from the above example. My
question is if there is a way to process such messages coming from a single
Kafka topic with pyflink without writing a custom DeserializationSchema.
Any help would be appreciated.

Kind Regards
Kamil


Re: How do I configure commit offsets on checkpoint in new KafkaSource api?

2021-11-19 Thread Mason Chen
Hi Marco,

>
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#additional-properties
In the new KafkaSource, you can configure it in your properties. You can
take a look at `KafkaSourceOptions#COMMIT_OFFSETS_ON_CHECKPOINT` for the
specific config, which is default to true if you have checkpoints enabled.

Best,
Mason

On Fri, Nov 19, 2021 at 7:45 AM Marco Villalobos 
wrote:

> The FlinkKafkaConsumer that will be deprecated has the method
> "setCommitOffsetsOnCheckpoints(boolan)" method.
>
> However, that functionality is not the new KafkaSource class.
>
> How is this behavior / functionality configured in the new API?
>
> -Marco A. Villalobos
>
>
>


How do I configure commit offsets on checkpoint in new KafkaSource api?

2021-11-19 Thread Marco Villalobos
The FlinkKafkaConsumer that will be deprecated has the method
"setCommitOffsetsOnCheckpoints(boolan)" method.

However, that functionality is not the new KafkaSource class.

How is this behavior / functionality configured in the new API?

-Marco A. Villalobos


Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-19 Thread yidan zhao
hi,还是这个问题,请问有什么确定的方法能确认某个文件属于无用,还是有用吗。
比如一种复杂的方式是:判定任务当前运行到什么时间点,比如14点,认为12点的数据已经完整了,则12点对应的分区中.开头文件都可以删除。但这种判定需要结合任务的watermark看任务跑到什么时间等,复杂性较高。
话说success文件可行吗,compact结束才有success?还是先有success后再慢慢compact呢。如果是前者,我可以写个ct脚本,遍历目录下存在success的情况下,则可以删除该目录下.开头的全部文件。

yidan zhao  于2021年11月17日周三 上午10:22写道:

> 还有基于检查点启动,首先数据完整性最终实际没问题对吧。
>
> yidan zhao  于2021年11月17日周三 上午10:22写道:
>
>> 出错原因是因为机器不稳定,tm超时等。
>> 话说这种有什么判别方法用于定期清理吗。
>>
>> Caizhi Weng  于2021年11月17日周三 上午9:50写道:
>>
>>> Hi!
>>>
>>> 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 .
>>> 开头的,表示当前不可见。只有
>>> checkpoint
>>> 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。
>>>
>>> yidan zhao  于2021年11月16日周二 下午5:36写道:
>>>
>>> >
>>> >
>>> 如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。
>>> >
>>>
>>


Replacing S3 Client in Hadoop plugin

2021-11-19 Thread Tamir Sagi
Hey Martijn,

sorry for late respond.

We wanted to replace the default client with our custom S3 client and not use 
the AmazonS3Client provided by the plugin.

We used Flink-s3-fs-hadoop v1.12.2 and for our needs we had to upgrade to 
v1.14.0 [1].

AmazonS3 client factory is initialized[2] - if the property 
"fs.s3a.s3.client.factory.impl" [3] is not provided the default factory is 
created [4] which provides AmazonS3Client - which does not support what we need.
I know that both the property and the factory interface are annotated with

@InterfaceAudience.Private
@InterfaceStability.Unstable
from very early version.

but we found this solution cleaner than extend the whole class and override the 
#setAmazonS3Client method.

Bottom line, all we had to do was to create our own implementation for 
S3ClientFactory interface [5]
and add to flink-conf.yaml :  s3.s3.client.factory.impl:  .
place both the plugin and our artifact(with Factory and client impl) under 
${FLINK_HOME}/plugins/s3

One important note:  Flink-s3-fs-hadoop plugin includes the whole 
com.amazonaws.s3 source code, to avoid plugin class loader issues, we needed to 
remove the aws-s3-java-sdk dependency and provide the plugin dependency with 
scope "provided".
If the jobs needs to do some work with S3,then shading com.amazonaws was also 
necessary.

[1] 
https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.14.0

[2] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L264-L266

[3] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L366-L369

[4] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java#L66

[5] 
https://github.com/apache/hadoop/blob/branch-3.2.2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Best,
Tamir.


From: Martijn Visser 
Sent: Wednesday, October 13, 2021 8:28 PM
To: Tamir Sagi ; user@flink.apache.org 

Subject: Re: Replacing S3 Client in Hadoop plugin


EXTERNAL EMAIL


Hi,

Could you elaborate on why you would like to replace the S3 client?

Best regards,

Martijn

On Wed, 13 Oct 2021 at 17:18, Tamir Sagi 
mailto:tamir.s...@niceactimize.com>> wrote:
I found the dependency


org.apache.hadoop
hadoop-aws
3.3.1


apparently its possible, there is a method
setAmazonS3Client

I think I found the solution.

Thanks.

Tamir.


From: Tamir Sagi 
mailto:tamir.s...@niceactimize.com>>
Sent: Wednesday, October 13, 2021 5:44 PM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Replacing S3 Client in Hadoop plugin

Hey community.

I would like to know if there is any way to replace the S3 client in Hadoop 
plugin[1] to a custom client(AmazonS3).

I did notice that Hadoop plugin supports replacing the implementation of 
S3AFileSystem using
"fs.s3a.impl" (in flink-conf.yaml it will be "s3.impl") but not the client 
itself [2]

  fs.s3a.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem
  The implementation class of the S3A Filesystem

I delved into Hadoop plugin source code [3] , the Client itself is of type 
AmazonS3Client and cannot be replaced (for example) with a client of type 
AmazonS3EncryptionV2.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
Amazon S3 | Apache 
Flink
Entropy injection for S3 file systems # The bundled S3 file systems 
(flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection.Entropy 
injection is a technique to improve the scalability of AWS S3 buckets through 
adding some random characters near the beginning of the key.
ci.apache.org

[2] 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
[3] 
https://github.com/apache/hadoop/blob/master/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Hadoop-AWS module: Integration with Amazon Web Services - Apache 
Hadoop
Overview. Apache Hadoop’s hadoop-aws module provides support for AWS 
integration. applications to easily use this support.. To include the S3A 
client in Apache Hadoop’s default classpath: Make sure 
thatHADOOP_OPTIONAL_TOOLS in hadoop-env.sh includes hadoop-aws in its list of 
optional modules to add in the classpath.. For client side interaction, you can 
declare that 

SQL????????

2021-11-19 Thread ??????
sparkHiveantlrsqlFlinkCalcite??javaccSQL??
??antlr??

??