Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Chen Yu
Hi  Chen,
You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.

For single non insert query, flink will collect output to the console 
automatically. Therefore, you don’t need to add insert also works.

But you must point out target table specifically when you need to write data to 
external storage.

Like,

String relateQuery = "insert into xxx select correlator_id , name, relationship 
from Correlation; ;


Best,
Yu Chen

获取 Outlook for iOS

发件人: Zhanghao Chen 
发送时间: Wednesday, December 6, 2023 7:21:50 PM
收件人: elakiya udhayanan ; user@flink.apache.org 

主题: Re: Query on using two sinks for a Flink job (Flink SQL)

Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


回复: Error in /jars/upload curl request

2023-11-06 Thread Chen Yu
 Hi Tauseef,

Adding an @ sign before the path will resolve your problem.
And I verified that both web and postman upload the jar file properly on the 
master branch code.
If you are still having problems then you can provide some more detailed 
information.

Here are some documents of curl by `man curl`.

   -F, --form 
  (HTTP SMTP IMAP) For HTTP protocol family, this lets curl emulate 
a filled-in form in which a user  has
  pressed the submit button. This causes curl to POST data using 
the Content-Type multipart/form-data ac�\
  cording to RFC 2388.

  For SMTP and IMAP protocols, this is the means to compose a 
multipart mail message to transmit.

  This enables uploading of binary files etc. To force the 
'content' part to be a file, prefix  the  file
  name  with an @ sign. To just get the content part from a file, 
prefix the file name with the symbol <.
  The difference between @ and < is then that @ makes a file get 
attached in the post as a  file  upload,
  while the < makes a text field and just get the contents for that 
text field from a file.


Best,
Yu Chen

发件人: Tauseef Janvekar 
发送时间: 2023年11月6日 22:27
收件人: user@flink.apache.org 
主题: Error in /jars/upload curl request

I am using curl request to upload a jar but it throws the below error

[image.png]
Received unknown attribute jarfile.

Not sure what is wrong here. I am following the standard documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/

Please let me know if I have to use some other command to upload a jar using 
"/jars/upload" endpoint

I also tried to upload using webui but it hangs continuously and only calls GET 
api with 200 success- https://flink-nyquist.hvreaning.com/jars

Thanks,
Tauseef


回复: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Chen Yu
Hi Arjun,

If you can filter files by a regex pattern, I think the config 
`source.path.regex-pattern`[1] maybe what you want.


  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files to read under the
-- directory of `path` option. This 
regex pattern should be
-- matched with the absolute file path. 
If this option is set,
-- the connector  will recursive all 
files under the directory
-- of `path` option

Best,
Yu Chen


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/


发件人: arjun s 
发送时间: 2023年11月6日 20:50
收件人: user@flink.apache.org 
主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
Configuration

Hi team,
I'm currently utilizing the Table API function within my Flink job, with the 
objective of reading records from CSV files located in a source directory. To 
obtain the file names, I'm creating a table and specifying the schema using the 
Table API in Flink. Consequently, when the schema matches, my Flink job 
successfully submits and executes as intended. However, in cases where the 
schema does not match, the job fails to submit. Given that the schema of the 
files in the source directory is unpredictable, I'm seeking a method to handle 
this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 STRING,file.path` 
STRING NOT NULL METADATA) WITH ('connector' = 'filesystem','path' = 
'file:///home/techuser/inputdata','format' = 'csv','source.monitor-interval' = 
'1')
=

Furthermore, I have a question about whether there's a way to read files from 
the source directory based on a specific regex pattern. This is relevant in our 
situation because only file names that match a particular pattern need to be 
processed by the Flink job.

Thanks and Regards,
Arjun


回复: Auditing sink using table api

2023-11-04 Thread Chen Yu
Hi Bo,

How about write the data to Print Connector[1] simultaneously via 
insertInto[2]? It will print the data into Taskmanager's Log.
Of course, you can choose an appropriate connector according to your audit log 
storage.

Best,
Yu Chen

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
[2]https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#emit-a-table


发件人: Bo <99...@qq.com>
发送时间: 2023年11月4日 13:53
收件人: user 
主题: Auditing sink using table api

Hello community,

I am looking for a way to perform auditing of the various sinks (mostly 
JdbcDynamicTableSink) using the table api.
By "auditing", I mean to log details of every row data coming into the sink, 
and any anormalies when the sink write to external systems.

Does flink have some kind of auditing mechanism in place? The only way I could 
see now is to make a custom sink that supports detail logging to external 
systems.

Any thoughts/suggestions?

Regards,

Bo