Re: Zigzag shape in TM JVM used memory

2021-04-08 Thread Piotr Nowojski
Hi,

I don't think there is a Flink specific answer to this question. Just do
what you would normally do with a normal Java application running inside a
JVM. If there is an OOM on heap space, you can either try to bump the heap
space, or reduce usage of it. The only Flink specific part is probably that
you need to leave enough memory for the framework itself, and that there
are a couple of different memory pools. You can read about those things in
the docs:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_trouble.html

Piotrek



czw., 8 kwi 2021 o 02:19 Lu Niu  napisał(a):

> Hi, Piotr
>
> Thanks for replying. I asked this because such a pattern might imply memory
> oversubscription. For example, I tuned down the memory of one app from heap
> 2.63GB to 367MB and the job still runs fine:
> before:
>
> https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing
>
> after:
>
> https://drive.google.com/file/d/1wNTHBT8aSJaAmL1rVY8jUkdp-G5znnMN/view?usp=sharing
>
>
> What's the best practice for tuning Flink job memory?
>
> 1. What’s a good start point users should try first?
> 2. How to make progress? e.g. flink application Foo currently encountered
> error OOM: java heap space. Where to move next? simply bump up
> taskmananger.memory? or just increase heap?
> 3. What’s the final state? Job running fine and ensuring XYZ headroom in
> each memory component?
>
> Best
> Lu
>
> On Tue, Apr 6, 2021 at 12:26 AM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > this should be posted on the user mailing list not the dev.
> >
> > Apart from that, this looks like normal/standard behaviour of JVM, and
> has
> > very little to do with Flink. Garbage Collector (GC) is kicking in when
> > memory usage is approaching some threshold:
> > https://www.google.com/search?q=jvm+heap+memory+usage&tbm=isch
> >
> > Piotrek
> >
> >
> > pon., 5 kwi 2021 o 22:54 Lu Niu  napisał(a):
> >
> > > Hi,
> > >
> > > we need to update our email system then :) . Here are the links:
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing
> > >
> > > All are DataStream job.
> > >
> > > Best
> > > Lu
> > >
> > > On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:
> > >
> > > >
> > > > Hi Lu,
> > > >
> > > > The image seems not be able to shown due to the mail server
> limitation,
> > > > could you upload it somewhere and paste the link here ?
> > > >
> > > > Logically, I think zigzag usually due to there are some small object
> > get
> > > > created and eliminated soon in the heap. Are you running a SQL job
> or a
> > > > DataStream job ?
> > > >
> > > > Best,
> > > > Yun
> > > >
> > > > --
> > > > Sender:Lu Niu
> > > > Date:2021/04/05 12:06:24
> > > > Recipient:d...@flink.apache.org
> > > > Theme:Zigzag shape in TM JVM used memory
> > > >
> > > > Hi, Flink dev
> > > >
> > > > We observed that the TM JVM used memory metric shows zigzag shape
> among
> > > > lots of our applications, although these applications are quite
> > different
> > > > in business logic. The upper bound is close to the max heap size. Is
> > this
> > > > expected in flink application? Or does flink internally
> > > > aggressively pre-allocate memory?
> > > >
> > > > app1
> > > > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> > > > app2
> > > > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> > > > app3
> > > > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
> > > >
> > > > Best
> > > > Lu
> > > >
> > > >
> > >
> >
>


Re: Flink 1.12.2 sql api use parquet format error

2021-04-08 Thread Timo Walther

Hi,

can you check the content of the JAR file that you are submitting? There 
should be a `META-INF/services` directory with a 
`org.apache.flink.table.factories.Factory` file that should list the 
Parque format.


See also here:

https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources

Regards,
Timo


On 06.04.21 10:25, ?? wrote:

ref:?0?2https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html

env and error:

Flink version?? 1.12.2
deployment?? standalone kubernetes session
dependency:
 ?0?2 ?0?2 ?0?2 ?0?2 
 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 org.apache.flink
flink-parquet_2.11
1.12.2
 ?0?2 ?0?2 ?0?2 ?0?2 

??
Caused by: org.apache.flink.table.api.ValidationException: Could not 
find any format factory for identifier 'parquet' in the classpath. at 
org.apache.flink.table.filesystem.FileSystemTableSink.(FileSystemTableSink.java:124) 
at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)






Re: Compression with rocksdb backed state

2021-04-08 Thread Timo Walther

Hi Deepthi,

1. Correct
2. Correct
3. Incremental snapshots simply manage references to RocksDB's sstables. 
You can find a full explanation here [1]. Thus, the payload is a 
blackbox for Flink and Flink's compression flag has no impact. So we 
fully rely what RocksDB offers.

4. Correct

I hope this helps.

Regards,
Timo


[1] 
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html



On 07.04.21 22:04, deepthi Sridharan wrote:
I am trying to understand this section on compression of checkpoints 
which has me a bit confused 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression 



Could you please confirm if my understanding is correct:
1. Compression is disabled by default for full snapshots and needs to be 
turned on if required.
2. Current versions do not support changing the compression type (snappy 
by default) when enabled. Future versions will support configuring the 
compression type.

3. Compression is enabled by default for incremental snapshot.
4. The compression type cannot be configured for incremental savepoints.

--
Regards,
Deepthi




Re: SingleValueAggFunction received more than one element error with LISTAGG

2021-04-08 Thread Timo Walther

Hi,

which Flink version are you using?

Could you also share the resulting plan with us using 
`TableEnvironment.explainSql()`?


Thanks,
Timo


On 07.04.21 17:29, soumoks123 wrote:

I receive the following error when trying to use the LISTAGG function in
Table API.


java.lang.RuntimeException: SingleValueAggFunction received more than one
element.
 at GroupAggsHandler$1460.accumulate(Unknown Source)
 at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:158)
 at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
 at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)


This is my query,

SELECT node.nid,
(SELECT LISTAGG(DISTINCT(TDC.name)) FROM M_TermNode TNC JOIN M_TermData TDC
ON TNC.tid = TDC.tid WHERE node.nid = TNC.nid AND TDC.vid = (SELECT
Vocab.vid from M_Vocabulary Vocab where Vocab.`module` = 'extra_fields')) AS
characteristics
FROM node


The above query looks slightly complex but essentially boils down to a group
by on TDC.vid. This may return more than one value for TDC.name which needs
to be concatenated into the same string.


I have tried removing the DISTINCT clause inside LISTAGG but to no avail.

The same query works on a MySQL DB with GROUP_CONCAT instead of LISTAGG.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a
checkpoint/savepoint? If you just start the application anew and use LATEST
initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use
unaligned checkpoints? (I don't really think this is because of unaligned
checkpoint, but it's better to be sure and we want to reduce the possible
error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from
org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> We are trying to make sure we are not losing data when KINESIS Consumer is
> down.
>
> Kinesis streaming Job which has following checkpointing properties:
>
>
> *// checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>
>
>
>
>
>
>
>
> *// enable externalized checkpoints which are retained after job
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>   );// allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  //
> enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>
> *//checkpointpath*
> *env.setStateBackend(new
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, *messages which were sent during the
> Consumer down period, never ingested (data loss).*
> 4) Re-sent messages to KDS while the consumer was still up. Messages did
> ingest fine.
>
> *How can I avoid data loss for #3 ??*
>
> From Logs:
>
>
> *2021-04-07 12:15:49,161 INFO
>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
> application-defined state backend: File State Backend (checkpoints:
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
> TRUE, fileStateThreshold: -1)*
>
>
>
> *2021-04-07 12:16:02,343 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
> ms).2021-04-07 12:16:11,951 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
> ms).*
>
> Thanks,
> Vijay
>


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Anyone who has faced similar issues with cdc with Postgres.

I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
replication records were streamed even though I have tried inserting
a record in the whitelisted table.

select * from pg_replication_slots;
  slot_name  |  plugin  | slot_type | datoid | database | temporary |
active | active_pid | xmin | catalog_xmin | restart_lsn |
confirmed_flush_lsn
-+--+---++--+---+++--+--+-+-
 stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
 |   1146 |  | 6872 | 62/34000828 | 62/34000860

I have passed the  heartbeat.interval.ms = 1000 and could see the heartbeat
events streamed to flink however the transaction log disk usage and oldest
replication slot lag consistently increasing. From [1] I have also tried
this -

For other decoder plug-ins, it is recommended to create a supplementary
table that is not monitored by Debezium.

A separate process would then periodically update the table (either
inserting a new event or updating the same row all over). PostgreSQL then
will invoke Debezium which will confirm the latest LSN and allow the
database to reclaim the WAL space.

[image: Screenshot 2021-04-08 at 2.07.18 PM.png]

[image: Screenshot 2021-04-08 at 2.07.52 PM.png]

[1] -
https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space

Thanks.

On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:

> Hi there,
>
> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
> postgres table. I see the WAL consumption is increasing gradually even
> though the writes to tables are very less.
>
> I am using AWS RDS, from [1] I understand that setting the parameter
> heartbeat.interval.ms solves this WAL consumption issue. However, I tried
> setting this with no success.
>
> I found a bug [2] which seems to be taking care of committing the lsn for
> the db to release the wal. however this seems to be only fixed in 1.3 which
> is compatible with flink 1.12.1. Is there any way this can be fixed in
> 1.11.1. Since I am using EMR and the latest flink version available is 1.11.
>
>
> [1] -
> https://debezium.io/documentation/reference/connectors/postgresql.html
> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>
> Thanks.
> Hemant
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Flavio Pompermaier
Any help here? Moreover if I use the DataStream APIs there's no left/right
outer join yet..are those meant to be added in Flink 1.13 or 1.14?

On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I'm testing writing to a CSV using Flink 1.13 and I get the following
> error:
>
> The matching candidates:
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> Unsupported property keys:
> format.quote-character
>
> I create the table env using this:
>
> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()//
> .useBlinkPlanner()//
> // .inBatchMode()//
> .inStreamingMode()//
> .build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>
> The error is the same both with inBatchMode and inStreamingMode.
> Is this really not supported or am I using the wrong API?
>
> Best,
> Flavio
>


Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
.

## Connector dependencies should be in default scope

This is what [flink-quickstart-scala](
https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
suggests:

```



```

It also aligns with [Flink project configuration](
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
):

> We recommend packaging the application code and all its required
dependencies into one jar-with-dependencies which we refer to as the
application jar. The application jar can be submitted to an already running
Flink cluster, or added to a Flink application container image.
>
> Important: For Maven (and other build tools) to correctly package the
dependencies into the application jar, these application dependencies must
be specified in scope compile (unlike the core dependencies, which must be
specified in scope provided).

## Hive connector dependencies should be in provided scope

However, [Flink Hive Integration docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
suggests the opposite:

> If you are building your own program, you need the following dependencies
in your mvn file. It’s recommended not to include these dependencies in the
resulting jar file. You’re supposed to add dependencies as stated above at
runtime.

## Why?

Thanks!

Best,
Yik San


?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
My application program looks like this. Does this structure has some problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);


EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(env, bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY 
TABLE ");
Table t = bsTableEnv.sqlQuery(query);


DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > Best,
 > > Yangze Guo
 > >
 > >
 > > On Tue, Apr 6, 2021 at 4:22 PM ?? <495635...@qq.com> wrote:
 > > >
 > > > batch job??
 > > > read data from s3 by sql??then by some operators and write data 
to clickhouse and kafka.
 > > > after some times, task-manager quit with OutOfMemoryError: 
Metaspace.
 > > >
 > > > env??
 > > > flink version??1.12.2
 > > > task-manager slot count: 5
 > > > deployment?? standalone kubernetes session 
 > > > dependencies??
 > > >
 > > >     

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Yangze Guo
IIUC, your program will finally generate 100 ChildFirstClassLoader in
a TM. But it should always be GC when job finished. So, as Arvid said,
you'd better check who is referencing those ChildFirstClassLoader.


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 5:43 PM 太平洋 <495635...@qq.com> wrote:
>
> My application program looks like this. Does this structure has some problem?
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> int i = 0;
> while (i < 100) {
> try {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.setParallelism(Parallelism);
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner()
> .inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
> bsSettings);
>
> bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
> Table t = bsTableEnv.sqlQuery(query);
>
> DataStream points = bsTableEnv.toAppendStream(t, DataPoint.class);
>
> DataStream weightPoints = points.map();
>
> DataStream predictPoints = weightPoints.keyBy()
> .reduce().map();
>
> // side output
> final OutputTag outPutPredict = new 
> OutputTag("predict") {
> };
>
> SingleOutputStreamOperator mainDataStream = predictPoints
> .process();
>
> DataStream exStream = 
> mainDataStream.getSideOutput(outPutPredict);
>
> //write data to clickhouse
> String insertIntoCKSql = "xxx";
> mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
> new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
> new 
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
> .withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));
>
> // write data to kafka
> FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
> exStream.map().addSink(producer);
>
> env.execute("Prediction Program");
> } catch (Exception e) {
> e.printStackTrace();
> }
> i++;
> Thread.sleep(window * 1000);
> }
> }
> }
>
>
>
> -- 原始邮件 --
> 发件人: "Arvid Heise" ;
> 发送时间: 2021年4月8日(星期四) 下午2:33
> 收件人: "Yangze Guo";
> 抄送: 
> "太平洋"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
> 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>
> Hi,
>
> ChildFirstClassLoader are created (more or less) by application jar and 
> seeing so many looks like a classloader leak to me. I'd expect you to see a 
> new ChildFirstClassLoader popping up with each new job submission.
>
> Can you check who is referencing the ChildFirstClassLoader transitively? 
> Usually, it's some thread that is lingering around because some third party 
> library is leaking threads etc.
>
> OneInputStreamTask is legit and just indicates that you have a job running 
> with 4 slots on that TM. It should not hold any dedicated metaspace memory.
>
> On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo  wrote:
>>
>> I went through the JM & TM logs but could not find any valuable clue.
>> The exception is actually thrown by kafka-producer-network-thread.
>> Maybe @Qingsheng could also take a look?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote:
>> >
>> > I have configured to 512M, but problem still exist. Now the memory size is 
>> > still 256M.
>> > Attachments are TM and JM logs.
>> >
>> > Look forward to your reply.
>> >
>> > -- 原始邮件 --
>> > 发件人: "Yangze Guo" ;
>> > 发送时间: 2021年4月6日(星期二) 晚上6:35
>> > 收件人: "太平洋"<495635...@qq.com>;
>> > 抄送: "user";"guowei.mgw";
>> > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> >
>> > > I have tried this method, but the problem still exist.
>> > How much memory do you configure for it?
>> >
>> > > is 21 instances of "org.apache.flink.util.ChildFirstClassLoader" normal
>> > Not quite sure about it. AFAIK, each job will have a classloader.
>> > Multiple tasks of the same job in the same TM will share the same
>> > classloader. The classloader will be removed if there is no more task
>> > running on the TM. Classloader without reference will be finally
>> > cleanup by GC. Could you share JM and TM logs for further analysis?
>> > I'll also involve @Guowei Ma in this thread.
>> >
>> >
>> > Best,
>> > Yangze Guo
>> >
>> > On Tue, Apr 6, 2021 at 6:05 PM 太平洋 <495635...@qq.com> wrote:
>> > >
>> > > I have tried this method, but the problem still exist.
>> > > by heap dump analysis, is 21 instances of 
>> > > "org.apache.flink.util.ChildFirstClassLoader" normal?
>> > >
>> > >
>> > > -- 原始邮件 --
>> > > 发件人: "Yangze Guo" ;
>> > > 发送时间: 2021年4月6日(星期二) 下午4:32
>> > > 收件人: "太平洋"<495635...@qq.com>;
>> > > 抄送: "user";
>> > > 主题: Re: period batch job lead to OutOfMemoryError: Metaspace problem
>> > >
>> > > I think you can try to increase the JVM metaspace option for
>> > > TaskManagers through taskmanager.memory.jvm-metaspac

Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread Till Rohrmann
Hi Hemant,

I am pulling in Jark who is most familiar with Flink's cdc connector. He
might also be able to tell whether the fix can be backported.

Cheers,
Till

On Thu, Apr 8, 2021 at 10:42 AM bat man  wrote:

> Anyone who has faced similar issues with cdc with Postgres.
>
> I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
> replication records were streamed even though I have tried inserting
> a record in the whitelisted table.
>
> select * from pg_replication_slots;
>   slot_name  |  plugin  | slot_type | datoid | database | temporary |
> active | active_pid | xmin | catalog_xmin | restart_lsn |
> confirmed_flush_lsn
>
> -+--+---++--+---+++--+--+-+-
>  stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
>|   1146 |  | 6872 | 62/34000828 | 62/34000860
>
> I have passed the  heartbeat.interval.ms = 1000 and could see
> the heartbeat events streamed to flink however the transaction log disk
> usage and oldest replication slot lag consistently increasing. From [1] I
> have also tried this -
>
> For other decoder plug-ins, it is recommended to create a supplementary
> table that is not monitored by Debezium.
>
> A separate process would then periodically update the table (either
> inserting a new event or updating the same row all over). PostgreSQL then
> will invoke Debezium which will confirm the latest LSN and allow the
> database to reclaim the WAL space.
>
> [image: Screenshot 2021-04-08 at 2.07.18 PM.png]
>
> [image: Screenshot 2021-04-08 at 2.07.52 PM.png]
>
> [1] -
> https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
>
> Thanks.
>
> On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:
>
>> Hi there,
>>
>> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
>> postgres table. I see the WAL consumption is increasing gradually even
>> though the writes to tables are very less.
>>
>> I am using AWS RDS, from [1] I understand that setting the parameter
>> heartbeat.interval.ms solves this WAL consumption issue. However, I
>> tried setting this with no success.
>>
>> I found a bug [2] which seems to be taking care of committing the lsn for
>> the db to release the wal. however this seems to be only fixed in 1.3 which
>> is compatible with flink 1.12.1. Is there any way this can be fixed in
>> 1.11.1. Since I am using EMR and the latest flink version available is 1.11.
>>
>>
>> [1] -
>> https://debezium.io/documentation/reference/connectors/postgresql.html
>> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>>
>> Thanks.
>> Hemant
>>
>


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-08 Thread Till Rohrmann
Hi Kevin,

when decreasing the TaskManager count I assume that you also decrease the
parallelism of the Flink job. There are three aspects which can then cause
a slower recovery.

1) Each Task gets a larger key range assigned. Therefore, each TaskManager
has to download more data in order to restart the Task. Moreover, there are
fewer nodes downloading larger portions of the data (less parallelization).
2) If you rescaled the parallelism, then it can happen that a Task gets a
key range assigned which requires downloading of multiple key range parts
from the previous run/savepoint. The new key range might not need all the
data from the savepoint parts and hence you download some data which is not
really used in the end.
3) When rescaling the job, then Flink has to rebuild the RocksDB instance
which is an expensive and slow operation. What happens is that Flink
creates for every savepoint part which it needs for its key range a RocksDB
instance and then extracts the part which is only relevant for its key
range into a new RocksDB instance. This causes a lot of read and write
amplification.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:

> Hi all,
>
> We are trying to benchmark savepoint size vs. restore time.
>
> One thing we've observed is that when we reduce the number of task
> managers, the time to restore from a savepoint increases drastically:
>
> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
> hours
>
> *Is this expected? How does the restore process work? Is this just a
> matter of having lower restore parallelism for 30 task managers vs 156 task
> managers? *
>
> Some details
>
> - Running on kubernetes
> - Used Rocksdb with a local ssd for state backend
> - Savepoint is hosted on GCS
> - The smaller task manager case is important to us because we expect to
> deploy our application with a high number of task managers, and downscale
> once a backfill is completed
>
> Differences between 1/ and 2/:
>
> 2/ has decreased task manager count 156 -> 30
> 2/ has decreased operator parallelism by a factor of ~10
> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
> rocksdb files
>
> Thanks in advance for your help!
>


Re: UniqueKey constraint is lost with multiple sources join in SQL

2021-04-08 Thread Kai Fu
As identified with the community, it's bug and more information in issue
https://issues.apache.org/jira/browse/FLINK-22113

On Sat, Apr 3, 2021 at 8:43 PM Kai Fu  wrote:

> Hi team,
>
> We have a use case to join multiple data sources to generate a
> continuous updated view. We defined primary key constraint on all the input
> sources and all the keys are the subsets in the join condition. All joins
> are left join.
>
> In our case, the first two inputs can produce *JoinKeyContainsUniqueKey *input
> sepc, which is good and performant. While when it comes to the third input
> source, it's joined with the intermediate output table of the first two
> input tables, and the intermediate table does not carry key constraint
> information(although the thrid source input table does), so it results in a
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic
> performance implications per the Force Join Unique Key
> 
> email thread, we want to know if there is any mitigation plan for this.
>
> One solution I can come up with is to write the intermediate result into
> some place like Kafka with unique constraint and join with the
> third source, while it requires extra resources. Any other suggestion on
> this? Thanks.
>
> --
> *Best regards,*
> *- Kai*
>


-- 
*Best wishes,*
*- Kai*


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Thanks Till.

Hi Jark,

Any inputs, going through the code of 1.1 and 1.3 in the meantime.

Thanks,
Hemant

On Thu, Apr 8, 2021 at 3:52 PM Till Rohrmann  wrote:

> Hi Hemant,
>
> I am pulling in Jark who is most familiar with Flink's cdc connector. He
> might also be able to tell whether the fix can be backported.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 10:42 AM bat man  wrote:
>
>> Anyone who has faced similar issues with cdc with Postgres.
>>
>> I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
>> replication records were streamed even though I have tried inserting
>> a record in the whitelisted table.
>>
>> select * from pg_replication_slots;
>>   slot_name  |  plugin  | slot_type | datoid | database | temporary |
>> active | active_pid | xmin | catalog_xmin | restart_lsn |
>> confirmed_flush_lsn
>>
>> -+--+---++--+---+++--+--+-+-
>>  stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
>>|   1146 |  | 6872 | 62/34000828 | 62/34000860
>>
>> I have passed the  heartbeat.interval.ms = 1000 and could see
>> the heartbeat events streamed to flink however the transaction log disk
>> usage and oldest replication slot lag consistently increasing. From [1] I
>> have also tried this -
>>
>> For other decoder plug-ins, it is recommended to create a supplementary
>> table that is not monitored by Debezium.
>>
>> A separate process would then periodically update the table (either
>> inserting a new event or updating the same row all over). PostgreSQL then
>> will invoke Debezium which will confirm the latest LSN and allow the
>> database to reclaim the WAL space.
>>
>> [image: Screenshot 2021-04-08 at 2.07.18 PM.png]
>>
>> [image: Screenshot 2021-04-08 at 2.07.52 PM.png]
>>
>> [1] -
>> https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
>>
>> Thanks.
>>
>> On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:
>>
>>> Hi there,
>>>
>>> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
>>> postgres table. I see the WAL consumption is increasing gradually even
>>> though the writes to tables are very less.
>>>
>>> I am using AWS RDS, from [1] I understand that setting the parameter
>>> heartbeat.interval.ms solves this WAL consumption issue. However, I
>>> tried setting this with no success.
>>>
>>> I found a bug [2] which seems to be taking care of committing the lsn
>>> for the db to release the wal. however this seems to be only fixed in 1.3
>>> which is compatible with flink 1.12.1. Is there any way this can be fixed
>>> in 1.11.1. Since I am using EMR and the latest flink version available is
>>> 1.11.
>>>
>>>
>>> [1] -
>>> https://debezium.io/documentation/reference/connectors/postgresql.html
>>> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>>>
>>> Thanks.
>>> Hemant
>>>
>>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Till Rohrmann
Hi Flavio,

I tried to execute the code snippet you have provided and I could not
reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working
example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier 
wrote:

> Any help here? Moreover if I use the DataStream APIs there's no left/right
> outer join yet..are those meant to be added in Flink 1.13 or 1.14?
>
> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>> error:
>>
>> The matching candidates:
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> Unsupported property keys:
>> format.quote-character
>>
>> I create the table env using this:
>>
>> final EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance()//
>> .useBlinkPlanner()//
>> // .inBatchMode()//
>> .inStreamingMode()//
>> .build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>>
>> The error is the same both with inBatchMode and inStreamingMode.
>> Is this really not supported or am I using the wrong API?
>>
>> Best,
>> Flavio
>>
>


Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Till Rohrmann
Hi Yik San,

for future reference, I copy my answer from the SO here:

The reason for this difference is that for Hive it is recommended to start
the cluster with the respective Hive dependencies. The documentation [1]
states that it's best to put the dependencies into the lib directory before
you start the cluster. That way the cluster is enabled to run jobs which
use Hive. At the same time, you don't have to bundle this dependency in the
user jar which reduces its size. However, there shouldn't be anything
preventing you from bundling the Hive dependency with your user code if you
want to.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies

Cheers,
Till

On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
> .
>
> ## Connector dependencies should be in default scope
>
> This is what [flink-quickstart-scala](
> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
> suggests:
>
> ```
> 
>
> 
> ```
>
> It also aligns with [Flink project configuration](
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
> ):
>
> > We recommend packaging the application code and all its required
> dependencies into one jar-with-dependencies which we refer to as the
> application jar. The application jar can be submitted to an already running
> Flink cluster, or added to a Flink application container image.
> >
> > Important: For Maven (and other build tools) to correctly package the
> dependencies into the application jar, these application dependencies must
> be specified in scope compile (unlike the core dependencies, which must be
> specified in scope provided).
>
> ## Hive connector dependencies should be in provided scope
>
> However, [Flink Hive Integration docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
> suggests the opposite:
>
> > If you are building your own program, you need the following
> dependencies in your mvn file. It’s recommended not to include these
> dependencies in the resulting jar file. You’re supposed to add dependencies
> as stated above at runtime.
>
> ## Why?
>
> Thanks!
>
> Best,
> Yik San
>


Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Great Info
I have deployed my own flink setup in AWS ECS. One Service for JobManager
and one Service for task Managers. I am running one ECS task for a job
manager and 3 ecs tasks for TASK managers.

I have a kind of batch job which I upload using flink rest every-day with
changing new arguments, when I submit each time disk memory gets increased
by ~ 600MB, I have given a checkpoint as S3 . Also I have set
*historyserver.archive.clean-expired-jobs* true .

Since I am running on ECS, I am not able to find why the memory is getting
increased on every jar upload and execution .

What are the flink config params I should look at to make sure the memory
is not shooting up?


Re: Compression with rocksdb backed state

2021-04-08 Thread deepthi Sridharan
Thank you, that makes sense.

On Thu, Apr 8, 2021 at 12:37 AM Timo Walther  wrote:

> Hi Deepthi,
>
> 1. Correct
> 2. Correct
> 3. Incremental snapshots simply manage references to RocksDB's sstables.
> You can find a full explanation here [1]. Thus, the payload is a
> blackbox for Flink and Flink's compression flag has no impact. So we
> fully rely what RocksDB offers.
> 4. Correct
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
>
> On 07.04.21 22:04, deepthi Sridharan wrote:
> > I am trying to understand this section on compression of checkpoints
> > which has me a bit confused
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
> >
> >
> > Could you please confirm if my understanding is correct:
> > 1. Compression is disabled by default for full snapshots and needs to be
> > turned on if required.
> > 2. Current versions do not support changing the compression type (snappy
> > by default) when enabled. Future versions will support configuring the
> > compression type.
> > 3. Compression is enabled by default for incremental snapshot.
> > 4. The compression type cannot be configured for incremental savepoints.
> >
> > --
> > Regards,
> > Deepthi
>
>

-- 
Regards,
Deepthi


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed
Flink would look for a checkpoint upon restart automatically.

*I should restart like below ?*

bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:

> Hi Vijay,
>
> edit: After re-reading your message: are you sure that you restart from a
> checkpoint/savepoint? If you just start the application anew and use LATEST
> initial position, this is the expected bahvior.
>
> --- original intended answer if you restart from checkpoint
>
> this is definitively not the expected behavior.
>
> To exclude certain error sources:
> - Could you double-check if this is also happening if you don't use
> unaligned checkpoints? (I don't really think this is because of unaligned
> checkpoint, but it's better to be sure and we want to reduce the possible
> error sources)
> - Can you see the missing messages still in Kinesis?
> - Could you extract all log INFO statements from
> org.apache.flink.streaming.connectors.kinesis and attach them here?
> - How long did you wait with recovery?
>
>
>
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>> We are trying to make sure we are not losing data when KINESIS Consumer
>> is down.
>>
>> Kinesis streaming Job which has following checkpointing properties:
>>
>>
>> *// checkpoint every X msecs
>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>
>>
>>
>>
>>
>>
>>
>>
>> *// enable externalized checkpoints which are retained after job
>> cancellation
>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>   );// allow job recovery fallback to checkpoint when there is a more
>> recent savepoint
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>  //
>> enables the experimental unaligned checkpoints
>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>
>> *//checkpointpath*
>> *env.setStateBackend(new
>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>
>> 1) We killed the Kinesis Job
>> 2) Sent messages to KDS while Consumer was down.
>> 3) Restarted Flink Consumer, *messages which were sent during the
>> Consumer down period, never ingested (data loss).*
>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>> ingest fine.
>>
>> *How can I avoid data loss for #3 ??*
>>
>> From Logs:
>>
>>
>> *2021-04-07 12:15:49,161 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>> application-defined state backend: File State Backend (checkpoints:
>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>> TRUE, fileStateThreshold: -1)*
>>
>>
>>
>> *2021-04-07 12:16:02,343 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>> ms).2021-04-07 12:16:11,951 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>> ms).*
>>
>> Thanks,
>> Vijay
>>
>


Re: Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Maciek Próchniak

Hi,

don't know if this is the problem you're facing, but some time ago we 
encountered two issues connected to REST API and increased disk usage 
after each submission:


https://issues.apache.org/jira/browse/FLINK-21164

https://issues.apache.org/jira/browse/FLINK-9844

- they're closed ATM, but only 1.12.2 contains the fixes.


maciek


On 08.04.2021 19:52, Great Info wrote:


I have deployed my own flink setup in AWS ECS. One Service for 
JobManager and one Service for task Managers. I am running one ECS 
task for a job manager and 3 ecs tasks for TASK managers.


I have a kind of batch job which I upload using flink rest every-day 
with changing new arguments, when I submit each time disk memory gets 
increased by ~ 600MB, I have given a checkpoint as S3 . Also I have 
set *historyserver.archive.clean-expired-jobs* true .


Since I am running on ECS, I am not able to find why the memory is 
getting increased on every jar upload and execution .


What are the flink config params I should look at to make sure the 
memory is not shooting up?




Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Flavio Pompermaier
Hi Till,
since I was using the same WITH-clause both for reading and writing I
discovered that overwrite is actually supported in the Sinks, while in the
Sources an exception is thrown (I was thinking that those properties were
simply ignored).
However the quote-character is not supported in the sinks: is this a bug or
is it the intended behaviour?.
Here is a minimal example that reproduce the problem (put in the
/tmp/test.csv something like '1,hello' or '2,hi').

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkCsvTest {
  public static void main(String[] args) throws Exception {
final EnvironmentSettings envSettings =

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
// ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
final String tableInName = "testTableIn";
final String createInTableDdl = getSourceDdl(tableInName,
"/tmp/test.csv"); //

final String tableOutName = "testTableOut";
final String createOutTableDdl = getSinkDdl(tableOutName,
"/tmp/test-out.csv"); //
tableEnv.executeSql(createInTableDdl);
tableEnv.executeSql(createOutTableDdl);

Table tableIn = tableEnv.from(tableInName);
Table tableOut = tableEnv.from(tableOutName);
tableIn.insertInto(tableOutName);
// tableEnv.toDataSet(table, Row.class).print();
tableEnv.execute("TEST read/write");

  }

  private static String getSourceDdl(String tableName, String filePath) {
return "CREATE TABLE " + tableName + " (\n" + //
" `id` BIGINT,\n" + //
" `name` STRING) WITH (\n" + //
" 'connector.type' = 'filesystem',\n" + //
" 'connector.property-version' = '1',\n" + //
" 'connector.path' = '" + filePath + "',\n" + //
" 'format.type' = 'csv',\n" + //
" 'format.field-delimiter' = ',',\n" + //
 //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
" 'format.property-version' = '1',\n" + //
" 'format.quote-character' = '\"',\n" + //
" 'format.ignore-first-line' = 'false'" + //
")";
  }

  private static String getSinkDdl(String tableName, String filePath) {
return "CREATE TABLE " + tableName + " (\n" + //
" `id` BIGINT,\n" + //
" `name` STRING) WITH (\n" + //
" 'connector.type' = 'filesystem',\n" + //
" 'connector.property-version' = '1',\n" + //
" 'connector.path' = '" + filePath + "',\n" + //
" 'format.type' = 'csv',\n" + //
" 'format.field-delimiter' = ',',\n" + //
" 'format.num-files' = '1',\n" + //
" 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
" 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
" 'format.property-version' = '1'\n" + //
")";
  }
}

Thanks for the support,
Flavio


On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann  wrote:

> Hi Flavio,
>
> I tried to execute the code snippet you have provided and I could not
> reproduce the problem.
>
> Concretely I am running this code:
>
> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>
> tableEnv.fromValues("foobar").execute().await();
>
> Am I missing something? Maybe you can share a minimal but fully working
> example where the problem occurs. Thanks a lot.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier 
> wrote:
>
>> Any help here? Moreover if I use the DataStream APIs there's no
>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>> 1.14?
>>
>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>> error:
>>>
>>> The matching candidates:
>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>> Unsupported property keys:
>>> format.quote-character
>>>
>>> I create the table env using this:
>>>
>>> final EnvironmentSettings envSettings =
>>> EnvironmentSettings.newInstance()//
>>> .useBlinkPlanner()//
>>> // .inBatchMode()//
>>> .inStreamingMode()//
>>> .build();
>>> final TableEnvironment tableEnv =
>>> TableEnvironment.create(envSettings);
>>>
>>> The error is the same both with inBatchMode and inStreamingMode.
>>> Is this really not supported or am I using the wrong API?
>>>
>>> Best,
>>> Flavio
>>>
>>


Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Maciek Próchniak

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in lib 
folder) and not in user-jar - as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?


When we encountered Metaspace leaks recently, in quite a few cases it 
turned out that the problem was the JDBC driver in user classloder which 
was registered by DriverManager and caused classloader leak.



maciek


On 08.04.2021 11:42, ?? wrote:
My application program looks like this. Does this structure has some 
problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t, 
DataPoint.class);


DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new 
OutputTag("predict") {

};

SingleOutputStreamOperator mainDataStream = predictPoints
.process();

DataStream exStream = 
mainDataStream.getSideOutput(outPutPredict);


?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2 ?0?2 ?0?2 ?0?2 //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)

.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



--?0?2?0?2--
*??:* "Arvid Heise" ;
*:*?0?22021??4??8??(??) 2:33
*??:*?0?2"Yangze Guo";
*:*?0?2"??"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
*:*?0?2Re: period batch job lead to OutOfMemoryError: Metaspace problem

Hi,

ChildFirstClassLoader are created (more or less) by application jar 
and seeing so many looks like a classloader leak to me. I'd expect you 
to see a new ChildFirstClassLoader popping up with each new job 
submission.


Can you check who is referencing the ChildFirstClassLoader 
transitively? Usually, it's some thread that is lingering around 
because some third party library is leaking threads etc.


OneInputStreamTask is legit and just indicates that you have a job 
running with 4 slots on that TM. It should not hold any dedicated 
metaspace memory.


On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo > wrote:


I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM ?? <495635...@qq.com
> wrote:
>
> I have configured to 512M, but problem still exist. Now the
memory size is still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> --  --
> ??: "Yangze Guo" mailto:karma...@gmail.com>>;
> : 2021??4??6??(??) 6:35
> ??: "??"<495635...@qq.com >;
> : "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;
> : Re: period batch job lead to OutOfMemoryError: Metaspace
problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of
"org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more
task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you share JM and TM logs for further analysis?
> I'll also involve @Guowei Ma in this thread.
>
>
> Best,
> Yangze Guo
>
> On Tue, Apr 6, 2021 at 6:05 PM ?? <495635...@qq.com
> wrote:
> >
> > I have tried this method, but the problem still exis

Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Thanks it was working fine with: bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
wrote:

> Hi Arvid,
>
> Thanks for your response. I did not restart from the checkpoint. I assumed
> Flink would look for a checkpoint upon restart automatically.
>
> *I should restart like below ?*
>
> bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> Thanks,
> Vijay
>
> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>
>> Hi Vijay,
>>
>> edit: After re-reading your message: are you sure that you restart from a
>> checkpoint/savepoint? If you just start the application anew and use LATEST
>> initial position, this is the expected bahvior.
>>
>> --- original intended answer if you restart from checkpoint
>>
>> this is definitively not the expected behavior.
>>
>> To exclude certain error sources:
>> - Could you double-check if this is also happening if you don't use
>> unaligned checkpoints? (I don't really think this is because of unaligned
>> checkpoint, but it's better to be sure and we want to reduce the possible
>> error sources)
>> - Can you see the missing messages still in Kinesis?
>> - Could you extract all log INFO statements from
>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>> - How long did you wait with recovery?
>>
>>
>>
>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> We are trying to make sure we are not losing data when KINESIS Consumer
>>> is down.
>>>
>>> Kinesis streaming Job which has following checkpointing properties:
>>>
>>>
>>> *// checkpoint every X msecs
>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *// enable externalized checkpoints which are retained after job
>>> cancellation
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>   );// allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>  //
>>> enables the experimental unaligned checkpoints
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>>
>>> *//checkpointpath*
>>> *env.setStateBackend(new
>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>>
>>> 1) We killed the Kinesis Job
>>> 2) Sent messages to KDS while Consumer was down.
>>> 3) Restarted Flink Consumer, *messages which were sent during the
>>> Consumer down period, never ingested (data loss).*
>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>>> ingest fine.
>>>
>>> *How can I avoid data loss for #3 ??*
>>>
>>> From Logs:
>>>
>>>
>>> *2021-04-07 12:15:49,161 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>>> application-defined state backend: File State Backend (checkpoints:
>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>>> TRUE, fileStateThreshold: -1)*
>>>
>>>
>>>
>>> *2021-04-07 12:16:02,343 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>>> ms).2021-04-07 12:16:11,951 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>>> ms).*
>>>
>>> Thanks,
>>> Vijay
>>>
>>


Re: Async + Broadcast?

2021-04-08 Thread Alex Cruise
Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry
about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
the one immediately preceding the async?

Thanks!

-0xe1a

*Types:*

/** all the configs for a given tenant, as of the time when a change was
observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map
)

/** parse raw strings from input, rejecting those for unconfigured tenants
*/
class Parse(
  initialConfigs: Map
) : BroadcastProcessFunction {
  override fun processBroadcastElement(
value: ConfigSnapshot,
ctx: Context,
out: Collector
  ) {
val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out:
Collector) {
val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
  .toMap()
  .keys
  .ifEmpty { initialConfigs.keys }

val parsed = Record(value)
if (!validTenantIds.contains(parsed.tenantId)) {
  return
} else {
  out.collect(parsed)
}
  }
}

/** given a parsed record, identity which config(s) are interested in it,
and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map
) : BroadcastProcessFunction> {
  override fun processBroadcastElement(
value: ConfigSnapshot,
ctx: Context,
out: Collector>
  ) {
val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
snapshots.put(value.tenantId, value)
  }

  override fun processElement(
value: Record,
ctx: ReadOnlyContext,
out: Collector>
  ) {
val configsForThisTenant =
ctx.getBroadcastState(configSnapshotDescriptor)
  .toMap()
  .ifEmpty { initialConfigs }
  .get(value.tenantId)
  .configs
  .orEmpty()

val configsInterestedInThisRecord = configsForThisTenant.values.filter
{
  it.interestedIn(value)
}

for ((configId, config) in configsInterestedInThisRecord) {
  out.collect(value to config)
}
  }
}

/** given a pair of Record and Config, run the async operation and send an
enriched record including the result */
class Enrich() : RichAsyncFunction, EnrichedRecord>

*Job Pseudocode:*

val initialConfigs: Map = ???
val dataSource: DataStream = ???
val configSource: DataStream = ??? // from a legacy "while
(true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a
Map
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream =
configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs
that are interested in a record
val validated: DataStream> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)





On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise  wrote:

> Hi Alex,
>
> your approach is completely valid. What you want to achieve is that you
> have a chain between your state managing operator and the consuming async
> operations. In that way, you have no serialization overhead.
>
> To achieve that you want to
> - use Flink 1.11+ [1]
> - make sure that if you have a legacy source, you disableChaining before
> your state managing operator as asyncIO cannot be (transitively) chained to
> legacy sources. So it should be source -> ... -> (forward channel) ->
> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
> - enableObjectReuse [2] to avoid copying of objects
>
> [1] https://issues.apache.org/jira/browse/FLINK-16219
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html
>
> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise  wrote:
>
>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>>
>> -0xe1a
>>
>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey Alex,
>>>
>>> I'm not sure if there is a best practice here, but what I can tell you
>>> is that I worked on a job that did exactly what you're suggesting with a
>>> non-async operator to create a [record, config] tuple, which was then
>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>> setup worked quite well. I'd say if latency isn't your most important
>>> metric, or if your pipeline is a sim

Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
Hi Flavio,

We would recommend you to use new table source & sink interfaces, which
have different
property keys compared to the old ones, e.g. 'connector' v.s.
'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should
work just fine.

*Flink SQL> set table.dml-sync=true;*

*[INFO] Session property has been set.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|3 |c |*

*+--+--+*

*Received a total of 1 row*


*Flink SQL> insert overwrite csv values(4, 'd');*

*[INFO] Submitting SQL update statement to the cluster...*

*[INFO] Execute statement in sync mode. Please wait for the execution
finish...*

*[INFO] Complete execution of the SQL update statement.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|4 |d |*

*+--+--+*

*Received a total of 1 row*

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html

Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
wrote:

> Hi Till,
> since I was using the same WITH-clause both for reading and writing I
> discovered that overwrite is actually supported in the Sinks, while in the
> Sources an exception is thrown (I was thinking that those properties were
> simply ignored).
> However the quote-character is not supported in the sinks: is this a bug
> or is it the intended behaviour?.
> Here is a minimal example that reproduce the problem (put in the
> /tmp/test.csv something like '1,hello' or '2,hi').
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class FlinkCsvTest {
>   public static void main(String[] args) throws Exception {
> final EnvironmentSettings envSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> // ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
> final String tableInName = "testTableIn";
> final String createInTableDdl = getSourceDdl(tableInName,
> "/tmp/test.csv"); //
>
> final String tableOutName = "testTableOut";
> final String createOutTableDdl = getSinkDdl(tableOutName,
> "/tmp/test-out.csv"); //
> tableEnv.executeSql(createInTableDdl);
> tableEnv.executeSql(createOutTableDdl);
>
> Table tableIn = tableEnv.from(tableInName);
> Table tableOut = tableEnv.from(tableOutName);
> tableIn.insertInto(tableOutName);
> // tableEnv.toDataSet(table, Row.class).print();
> tableEnv.execute("TEST read/write");
>
>   }
>
>   private static String getSourceDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1',\n" + //
> " 'format.quote-character' = '\"',\n" + //
> " 'format.ignore-first-line' = 'false'" + //
> ")";
>   }
>
>   private static String getSinkDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
> " 'format.num-files' = '1',\n" + //
> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1'\n" + //
> ")";
>   }
> }
>
> Thanks for the support,
> Flavio
>
>
> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann  wrote:
>
>> Hi Flavio,
>>
>> I tried to execute the code snippet you have provided and I could not
>> reproduce the problem.
>>
>> Concretely I am running this code:
>>
>> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .bu

Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Yik San Chan
Hi Till, I have 2 follow-ups.

(1) Why is Hive special, while for connectors such as kafka, the docs
suggest simply bundling the kafka connector dependency with my user code?

(2) it seems the document misses the "before you start the cluster" part -
does it always require a cluster restart whenever the /lib directory
changes?

Thanks.

Best,
Yik San

On Fri, Apr 9, 2021 at 1:07 AM Till Rohrmann  wrote:

> Hi Yik San,
>
> for future reference, I copy my answer from the SO here:
>
> The reason for this difference is that for Hive it is recommended to start
> the cluster with the respective Hive dependencies. The documentation [1]
> states that it's best to put the dependencies into the lib directory before
> you start the cluster. That way the cluster is enabled to run jobs which
> use Hive. At the same time, you don't have to bundle this dependency in the
> user jar which reduces its size. However, there shouldn't be anything
> preventing you from bundling the Hive dependency with your user code if you
> want to.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
>> .
>>
>> ## Connector dependencies should be in default scope
>>
>> This is what [flink-quickstart-scala](
>> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
>> suggests:
>>
>> ```
>> 
>>
>> 
>> ```
>>
>> It also aligns with [Flink project configuration](
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
>> ):
>>
>> > We recommend packaging the application code and all its required
>> dependencies into one jar-with-dependencies which we refer to as the
>> application jar. The application jar can be submitted to an already running
>> Flink cluster, or added to a Flink application container image.
>> >
>> > Important: For Maven (and other build tools) to correctly package the
>> dependencies into the application jar, these application dependencies must
>> be specified in scope compile (unlike the core dependencies, which must be
>> specified in scope provided).
>>
>> ## Hive connector dependencies should be in provided scope
>>
>> However, [Flink Hive Integration docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
>> suggests the opposite:
>>
>> > If you are building your own program, you need the following
>> dependencies in your mvn file. It’s recommended not to include these
>> dependencies in the resulting jar file. You’re supposed to add dependencies
>> as stated above at runtime.
>>
>> ## Why?
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
My DDL is:

CREATE TABLE csv (
   id BIGINT,
   name STRING
) WITH (
   'connector' = 'filesystem',
   'path' = '.',
   'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:

> Hi Flavio,
>
> We would recommend you to use new table source & sink interfaces, which
> have different
> property keys compared to the old ones, e.g. 'connector' v.s.
> 'connector.type'.
>
> You can follow the 1.12 doc [1] to define your csv table, everything
> should work just fine.
>
> *Flink SQL> set table.dml-sync=true;*
>
> *[INFO] Session property has been set.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|3 |c |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
>
> *Flink SQL> insert overwrite csv values(4, 'd');*
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] Execute statement in sync mode. Please wait for the execution
> finish...*
>
> *[INFO] Complete execution of the SQL update statement.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|4 |d |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
> wrote:
>
>> Hi Till,
>> since I was using the same WITH-clause both for reading and writing I
>> discovered that overwrite is actually supported in the Sinks, while in the
>> Sources an exception is thrown (I was thinking that those properties were
>> simply ignored).
>> However the quote-character is not supported in the sinks: is this a bug
>> or is it the intended behaviour?.
>> Here is a minimal example that reproduce the problem (put in the
>> /tmp/test.csv something like '1,hello' or '2,hi').
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class FlinkCsvTest {
>>   public static void main(String[] args) throws Exception {
>> final EnvironmentSettings envSettings =
>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>> // ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
>> final String tableInName = "testTableIn";
>> final String createInTableDdl = getSourceDdl(tableInName,
>> "/tmp/test.csv"); //
>>
>> final String tableOutName = "testTableOut";
>> final String createOutTableDdl = getSinkDdl(tableOutName,
>> "/tmp/test-out.csv"); //
>> tableEnv.executeSql(createInTableDdl);
>> tableEnv.executeSql(createOutTableDdl);
>>
>> Table tableIn = tableEnv.from(tableInName);
>> Table tableOut = tableEnv.from(tableOutName);
>> tableIn.insertInto(tableOutName);
>> // tableEnv.toDataSet(table, Row.class).print();
>> tableEnv.execute("TEST read/write");
>>
>>   }
>>
>>   private static String getSourceDdl(String tableName, String filePath) {
>> return "CREATE TABLE " + tableName + " (\n" + //
>> " `id` BIGINT,\n" + //
>> " `name` STRING) WITH (\n" + //
>> " 'connector.type' = 'filesystem',\n" + //
>> " 'connector.property-version' = '1',\n" + //
>> " 'connector.path' = '" + filePath + "',\n" + //
>> " 'format.type' = 'csv',\n" + //
>> " 'format.field-delimiter' = ',',\n" + //
>>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>> " 'format.property-version' = '1',\n" + //
>> " 'format.quote-character' = '\"',\n" + //
>> " 'format.ignore-first-line' = 'false'" + //
>> ")";
>>   }
>>
>>   private static String getSinkDdl(String tableName, String filePath) {
>> return "CREATE TABLE " + tableName + " (\n" + //
>> " `id` BIGINT,\n" + //
>> " `name` STRING) WITH (\n" + //
>> " 'connector.type' = 'filesystem',\n" + //
>> " 'connector.property-version' = '1',\n" + //
>> " 'connector.path' = '" + filePath + "',\n" + //
>> " 'format.type' = 'csv',\n" + //
>> " 'format.field-delimiter' = ',',\n" + //
>> " 'format.num-files' = '1',\n" + //
>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>> only)
>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>> " 'format.property-version' = '1'\n" + //
>> ")";
>>

?????? ?????? period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread ??????
I have tried  to add 'classloader.parent-first-patterns.additional: 
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink lib/ folder?
 


--  --
??: 
   "Maciek Pr??chniak"  
  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?
 
When we encountered Metaspace leaks recently, in quite a few   cases it 
turned out that the problem was the JDBC driver in user   classloder which 
was registered by DriverManager and caused   classloader leak.
 

 
 
maciek
 
 

 

 
 On 08.04.2021 11:42, ?? wrote:
 
My application program looks like this. Does this 
structure   has some problem?
 
 
 public class StreamingJob {
public static void   main(String[] args) throws 
Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment  
 env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);
 
 
EnvironmentSettings   
bsSettings =   EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment  
 bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
 
 
bsTableEnv.executeSql("CREATE   
TEMPORARY TABLE ");
Table t =   
bsTableEnv.sqlQuery(query);
 
 
DataStreamhttps://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-metaspace
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > Best,
 > > Yangze Guo
 > >
 > >
 > > On Tue, Apr 6, 2021 at 4:22 PM ?? 
<495635...@qq.com> wrote:
 > > >
 > > > batch job??
 > > > read data from s3 by sql??then by some 
operators and write data to clickhouse and kafka.
 > > > after some times, task-manager quit with
 OutOfMemoryError: Metaspace.
 > > >
 > > > env??
 > > > flink version??1.12.2
 > > > task-manager slot count: 5
 > > > deployment?? standalone kubernetes session 
 > > > dependencies??
 > > >
 > > >     

how to submit jobs remotely when a rest proxy like nginx is used and REST endpoint is bind to loopback interface?

2021-04-08 Thread Ming Li
Hi,

The flink official document clearly states that "Simple mutual
authentication may be enabled by configuration if authentication of
connections to the REST endpoint is required, but we recommend to deploy a
“side car proxy”: Bind the REST endpoint to the loopback interface (or the
pod-local interface in Kubernetes) and start a REST proxy that
authenticates and forwards the requests to Flink. Examples for proxies that
Flink users have deployed are Envoy Proxy  or NGINX
with MOD_AUTH
."



So I am wondering, in standalone mode when HA is not enabeld, when a rest
proxy like nginx is used, and rest endpoint is bind to the loopback
interface, how should we submit jobs remotely?



ps.

1. sample flink-conf.yaml settings, and nginx settings are as below showing:

rest.bind-port: 9091/rest.bind-address: 127.0.0.1 (this is the port and ip
where the rest endpoint bind itself to in the host where it is started)

rest.port: 9091/rest.address: 127.0.0.1 (this is the port and ip used by
rest clients when submit requests, so basically it should reach the above
rest.bind-port/rest.bind-address)

[image: image.png]

2. I know that we can use curl to request the nginx proxy, which
authenticates and forwards the request to flink, as below showing: curl -v
-u user1:user1 http://10.20.39.43:8080/config (which is the address where
nginx is listening to)

3. I know that  we can submit jobs from the host where job manager is
located, as below showing:

/opt/flink-1.12.2/bin/flink run -m 127.0.0.1:9091
/opt/flink-1.12.2/examples/batch/WordCount.jar --input /tmp/README.txt
--output /tmp/flink.test.txt11  ()

Thanks!
-- 
Best Regards
Michael Li


Flink Metrics emitted from a Kubernetes Application Cluster

2021-04-08 Thread Claude M
Hello,

I've setup Flink as an Application Cluster in Kubernetes.  Now I'm looking
into monitoring the Flink cluster in Datadog.  This is what is configured
in the flink-conf.yaml to emit metrics:

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
metrics.reporter.dghttp.class:
org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: {{ datadog_api_key }}
metrics.reporter.dghttp.tags: environment: {{ environment }}

When it gets to Datadog though, the metrics for the flink.jobmanager and
flink.taskmanager is filtered by the host which is the Pod IP.  However, I
would like it to use the pod name.  How can this be accomplished?


Thanks


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start
from scratch (e.g., you had a bug in your business logic and need to start
completely without state).

If there is any failure and Flink restarts automatically, it will always
pick up from the latest checkpoint [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery

On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav 
wrote:

> Thanks it was working fine with: bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for your response. I did not restart from the checkpoint. I
>> assumed Flink would look for a checkpoint upon restart automatically.
>>
>> *I should restart like below ?*
>>
>> bin/flink run  -s
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>> \
>>
>> Thanks,
>> Vijay
>>
>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>>
>>> Hi Vijay,
>>>
>>> edit: After re-reading your message: are you sure that you restart from
>>> a checkpoint/savepoint? If you just start the application anew and use
>>> LATEST initial position, this is the expected bahvior.
>>>
>>> --- original intended answer if you restart from checkpoint
>>>
>>> this is definitively not the expected behavior.
>>>
>>> To exclude certain error sources:
>>> - Could you double-check if this is also happening if you don't use
>>> unaligned checkpoints? (I don't really think this is because of unaligned
>>> checkpoint, but it's better to be sure and we want to reduce the possible
>>> error sources)
>>> - Can you see the missing messages still in Kinesis?
>>> - Could you extract all log INFO statements from
>>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>>> - How long did you wait with recovery?
>>>
>>>
>>>
>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Team,

 We are trying to make sure we are not losing data when KINESIS Consumer
 is down.

 Kinesis streaming Job which has following checkpointing properties:


 *// checkpoint every X msecs
 env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*








 *// enable externalized checkpoints which are retained after job
 cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
   );// allow job recovery fallback to checkpoint when there is a more
 recent savepoint
 env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
  //
 enables the experimental unaligned checkpoints
 env.getCheckpointConfig().enableUnalignedCheckpoints();*

 *//checkpointpath*
 *env.setStateBackend(new
 FsStateBackend(Conf.getFlinkCheckPointPath(), true));*

 1) We killed the Kinesis Job
 2) Sent messages to KDS while Consumer was down.
 3) Restarted Flink Consumer, *messages which were sent during the
 Consumer down period, never ingested (data loss).*
 4) Re-sent messages to KDS while the consumer was still up. Messages
 did ingest fine.

 *How can I avoid data loss for #3 ??*

 From Logs:


 *2021-04-07 12:15:49,161 INFO
  org.apache.flink.runtime.jobmaster.JobMaster  - Using
 application-defined state backend: File State Backend (checkpoints:
 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
 TRUE, fileStateThreshold: -1)*



 *2021-04-07 12:16:02,343 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
 ms).2021-04-07 12:16:11,951 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
 checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
 ms).*

 Thanks,
 Vijay

>>>


Re: Async + Broadcast?

2021-04-08 Thread Arvid Heise
Hi Alex,

The easiest way to verify if what you tried is working out is to look at
Flink's Web UI and check the topology.

The broadcast side of the input will always be ... well broadcasted (=not
chained). So you need to disable chaining only on the non-broadcasted
dataset.

val parsed: DataStream = dataSource
  .disableChaining()
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

Regarding objectReuse, it's safe to enable if you don't do any dirty hacks
on data that has been output already. So what you cannot do is, store the
last element in your map function (without managed state) and use that to
calculate the new result.

On Fri, Apr 9, 2021 at 1:13 AM Alex Cruise  wrote:

> Thanks Arvid! I'm not completely clear on where to apply your suggestions.
>
> I've included a sketch of my job below, and I have a couple questions:
>
> 1. It looks like enableObjectReuse() is a global setting, should I worry
> about whether I'm using any mutable data between stages?
> 2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
> the one immediately preceding the async?
>
> Thanks!
>
> -0xe1a
>
> *Types:*
>
> /** all the configs for a given tenant, as of the time when a change was
> observed */
> data class ConfigSnapshot(
>   tenantId: Long,
>   timestamp: Instant,
>   configs: Map
> )
>
> /** parse raw strings from input, rejecting those for unconfigured tenants
> */
> class Parse(
>   initialConfigs: Map
> ) : BroadcastProcessFunction {
>   override fun processBroadcastElement(
> value: ConfigSnapshot,
> ctx: Context,
> out: Collector
>   ) {
> val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
> snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(value: String, ctx: ReadOnlyContext, out:
> Collector) {
> val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
>   .toMap()
>   .keys
>   .ifEmpty { initialConfigs.keys }
>
> val parsed = Record(value)
> if (!validTenantIds.contains(parsed.tenantId)) {
>   return
> } else {
>   out.collect(parsed)
> }
>   }
> }
>
> /** given a parsed record, identity which config(s) are interested in it,
> and send an output value of the record tupled with the interested config */
> class ValidateAndDistribute(
>   initialConfigs: Map
> ) : BroadcastProcessFunction>
> {
>   override fun processBroadcastElement(
> value: ConfigSnapshot,
> ctx: Context,
> out: Collector>
>   ) {
> val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
> snapshots.put(value.tenantId, value)
>   }
>
>   override fun processElement(
> value: Record,
> ctx: ReadOnlyContext,
> out: Collector>
>   ) {
> val configsForThisTenant =
> ctx.getBroadcastState(configSnapshotDescriptor)
>   .toMap()
>   .ifEmpty { initialConfigs }
>   .get(value.tenantId)
>   .configs
>   .orEmpty()
>
> val configsInterestedInThisRecord = configsForThisTenant.values.filter
> {
>   it.interestedIn(value)
> }
>
> for ((configId, config) in configsInterestedInThisRecord) {
>   out.collect(value to config)
> }
>   }
> }
>
> /** given a pair of Record and Config, run the async operation and send an
> enriched record including the result */
> class Enrich() : RichAsyncFunction, EnrichedRecord>
>
> *Job Pseudocode:*
>
> val initialConfigs: Map = ???
> val dataSource: DataStream = ???
> val configSource: DataStream = ??? // from a legacy "while
> (true) { poll; sleep }" source
>
> // the config-subscribing operators keep the broadcast state in a
> Map
> val configSnapshotDescriptor = MapStateDescriptor(
>   "currentConfigSnapshots",
>   Long::class.java,
>   ConfigSnapshot::class.java
> )
>
> // Broadcast the snapshots
> val configBroadcast: BroadcastStream =
> configSource.broadcast(configSnapshotDescriptor)
>
> val parsed: DataStream = dataSource
>   .connect(configBroadcast)
>   .process(Parse(initialConfigs))
>
> // input records can be duplicated now, as there may be multiple Configs
> that are interested in a record
> val validated: DataStream> = parsed
>   .connect(configBroadcast)
>   .process(ValidateAndDistribute(initialConfigs))
>
> val enriched: DataStream = AsyncDataStream.unorderedWait(
>   validated,
>   Enrich(),
>   5L,
>   TimeUnit.SECONDS
> )
>
>
>
>
>
> On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise  wrote:
>
>> Hi Alex,
>>
>> your approach is completely valid. What you want to achieve is that you
>> have a chain between your state managing operator and the consuming async
>> operations. In that way, you have no serialization overhead.
>>
>> To achieve that you want to
>> - use Flink 1.11+ [1]
>> - make sure that if you have a legacy source, you disableChaining before
>> your state managing operator as asyncIO cannot be (transitively) chained to
>> legacy sources. So it should be source -> ... -> (forward channel) ->
>> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
>> - enableOb

Re: Organizing Flink Applications: Mono repo or polyrepo

2021-04-08 Thread Arvid Heise
Hi Bin,

I would put Flink applications into separate repos. It reduces compile
times and makes automatic deployment much easier (if you update
master/release branch of application X, you simply deploy it - potentially
with some manual trigger in your CI/CD pipeline) . You can also easily bump
Flink versions for applications that benefit from new features.

If you have a large amount of shared code between your Flink applications
and they have similar life cycles (e.g. a kind of strongly coupled), then
it makes sense to put these specific applications into the same repo. A
smaller amount of shared code should reside in its own repo.

Naturally, if you have one code base for several Flink applications (e.g.
generic application that is configured), then you have only one repo. For
example, you have a generic engine to execute some SQL query on any dataset
and only need a bit of yaml to configure it, you would have one application
with X deployments. Usually, upon updating the generic application, you
want to refresh all deployments.

TL;DR only put in things into the same repo, if they share the life-cycle
to keep deployment simple.

I hope this helps,

Arvid

On Wed, Mar 31, 2021 at 3:49 AM Xinbin Huang  wrote:

> Hi community
>
> I am curious about people's experience in structuring Flink applications.
> Do you use a mono repo structure (multiple applications in one single repo)
> or broken down each application into its own repo?
>
> If possible, can you share some of your thoughts on the pros/cons of each
> approach?
>
> Thanks
> Bin
>


Re: Avro schema

2021-04-08 Thread Arvid Heise
Hi Sumeet,

The beauty of Avro lies in having reader and writer schema and schema
compatibility, such that if your schema evolves over time (which will
happen in streaming naturally but is also very common in batch), you can
still use your application as is without modification. For streaming, this
methodology also implies that you can process elements with different
schema versions in the same run, which is mandatory for any non-toy example.

If you read into this topic, you will realize that it doesn't make sense to
read from Avro without specifying your reader schema (except for some
generic applications, but they should be written in DataStream). If you
keep in mind that your same dataset could have different schemas, you will
notice that your ideas quickly reach some limitations (which schema to
take?). What you could do, is to write a small script to generate the
schema DDL from your current schema in your actual data if you have very
many columns and datasets. It certainly would also be an interesting idea
to pass a static Avro/Json schema to the DDL.

On Fri, Apr 2, 2021 at 10:57 AM Paul Lam  wrote:

> Hi Sumeet,
>
> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to
> derived SQL table schemas from Avro schemas, because table schemas would be
> the ground truth by design.
> Moreover, one Avro type can be mapped to multiple Flink types, so in
> practice maybe it’s also not viable.
>
> Best,
> Paul Lam
>
> 2021年4月2日 11:34,Sumeet Malhotra  写道:
>
> Just realized, my question was probably not clear enough. :-)
>
> I understand that the Avro (or JSON for that matter) format can be
> ingested as described here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
> but this still requires the entire table specification to be written in the
> "CREATE TABLE" section. Is it possible to just specify the Avro schema and
> let Flink map it to an SQL table?
>
> BTW, the above link is titled "Table API Legacy Connectors", so is this
> still supported? Same question for YAML specification.
>
> Thanks,
> Sumeet
>
> On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
> wrote:
>
>> Hi,
>>
>> Is it possible to directly import Avro schema while ingesting data into
>> Flink? Or do we always have to specify the entire schema in either SQL DDL
>> for Table API or using DataStream data types? From a code maintenance
>> standpoint, it would be really helpful to keep one source of truth for the
>> schema somewhere.
>>
>> Thanks,
>> Sumeet
>>
>
>


Re: Proper way to get DataStream

2021-04-08 Thread Arvid Heise
Hi Maminspapin,

I just answered another question similarly, so let me just c&p it here:

The beauty of Avro lies in having reader and writer schema and schema
compatibility, such that if your schema evolves over time (which will
happen in streaming naturally but is also very common in batch), you can
still use your application as is without modification. For streaming, this
methodology also implies that you can process elements with different
schema versions in the same run, which is mandatory for any non-toy example.

If you read into this topic, you will realize that it doesn't make sense to
read from Avro without specifying your reader schema (except for some
generic applications, but they should be written in DataStream). If you
keep in mind that your same dataset could have different schemas, you will
notice that your ideas quickly reach some limitations (which schema to
take?). What you could do, is to write a small script to generate the
schema DDL from your current schema in your actual data if you have very
many columns and datasets. It certainly would also be an interesting idea
to pass a static Avro/Json schema to the DDL.

Note that in KafkaStreams, you have the same issue. You usually generate
your Java classes from some schema version, which will become your reader
schema. You can and should do the same in Flink. Please read [1] for more
information.

[1] https://www.baeldung.com/java-apache-avro#read-schema

On Sun, Apr 4, 2021 at 4:21 PM Maminspapin  wrote:

> Hi, @Arvid Heise-4, @Matthias
>
> I'm very appreciate for your attention, guys. And sorry for my late reply.
>
> Yes, Arvid, you are right, the second way in fact works. I coppied schema
> from Schema Registry using it's API and created the .avsc format file. And
> thanks again for explaining me why the first way is not compatible.
>
> So, my code to define schema is (I don't know is it good decision...):
>
> Path path = Paths.get("path_to_schema/schema.avsc");
> String content = new String(Files.readAllBytes(path));
> Schema schema = new Schema.Parser().parse(content);
>
> And it really works.
>
> But, I don't understand why should I use two schemas:
> 1. schema I created (reader schema)
> 2. schema I get with SR url (writer schema)
>
> I have some expirience with KafkaStreams lib and using it there is no need
> to get reader schema. There is one service to communicate with schemas -
> it's Schema Registry. Why not to use single source to get schema in Flink?
>
>
> Again, the second way is correct, and I can to go farther with my program.
>
> Thanks.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>