Re: Streaming SQL support for redis streaming connector

2021-09-14 Thread Yangze Guo
> What about the bahir streaming connectors? Are they considered canonical? 
> Would they be merged in to the main project at some point? Iiuc we can use 
> table API etc to write data to redis using that right?

I think it still can be used with Flink 1.12 and 1.13. However, it has
not been updated for 5 months and is not implemented based on the new
interfaces introduced in FLIP-95. AFAIK, the community does not plan
to merge it.

> What more will be required for us to use SQL? (as in specify the connector in 
> the WITH clause as redis)

To use the bahir with SQL, you need to build the project and move the
uber jar to the lib directory of your flink dist and fill in necessary
properties[1][2].

[1] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
[2] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java


Best,
Yangze Guo

On Wed, Sep 15, 2021 at 11:34 AM Osada Paranaliyanage
 wrote:
>
> Hi David,
>
>
>
>   What about the bahir streaming connectors? Are they considered canonical? 
> Would they be merged in to the main project at some point? Iiuc we can use 
> table API etc to write data to redis using that right? What more will be 
> required for us to use SQL? (as in specify the connector in the WITH clause 
> as redis)
>
>
>
> Thanks,
>
> Osada.
>
>
>
> From: David Morávek 
> Sent: Tuesday, September 14, 2021 7:53 PM
> To: Osada Paranaliyanage 
> Cc: user@flink.apache.org
> Subject: Re: Streaming SQL support for redis streaming connector
>
>
>
> [EXTERNAL EMAIL] This email has been received from an external source – 
> please review before actioning, clicking on links, or opening attachments.
>
> Hi Osada,
>
>
>
> in theory building a Redis table from "CDC stream" should definitely be 
> doable. Unfortunately Flink currently doesn't have any official Redis Sink 
> for the Table API and there is currently no on-going effort for adding it, so 
> it would need to be implemented first. The resulting syntax would be pretty 
> much the same to what's outlined in the mentioned example.
>
>
>
> Best,
>
> D.
>
>
>
> On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage 
>  wrote:
>
> Hi All, We are looking to use flink to build a materialized view of a 
> relation db and a document db using cdc streams. For this purpose we would 
> like to use redis for hosting the materialized view. Can we do this in 
> streaming SQL? We have worked through 
> https://github.com/ververica/flink-sql-CDC and can see how this will work 
> with ES as a sink. But can we use redis as the sink? Where do we find the 
> syntax for that?
>
>
>
> Thanks,
>
> Osada.
>
>
>
>
>
> 
>
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>
>
> 
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>


RE: Streaming SQL support for redis streaming connector

2021-09-14 Thread Osada Paranaliyanage
Hi David,

  What about the bahir streaming connectors? Are they considered canonical? 
Would they be merged in to the main project at some point? Iiuc we can use 
table API etc to write data to redis using that right? What more will be 
required for us to use SQL? (as in specify the connector in the WITH clause as 
redis)

Thanks,
Osada.

From: David Morávek 
Sent: Tuesday, September 14, 2021 7:53 PM
To: Osada Paranaliyanage 
Cc: user@flink.apache.org
Subject: Re: Streaming SQL support for redis streaming connector


[EXTERNAL EMAIL] This email has been received from an external source – please 
review before actioning, clicking on links, or opening attachments.
Hi Osada,

in theory building a Redis table from "CDC stream" should definitely be doable. 
Unfortunately Flink currently doesn't have any official Redis Sink for the 
Table API and there is currently no on-going effort for adding it, so it would 
need to be implemented first. The resulting syntax would be pretty much the 
same to what's outlined in the mentioned example.

Best,
D.

On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage 
mailto:osada.paranaliyan...@dialog.lk>> wrote:
Hi All, We are looking to use flink to build a materialized view of a relation 
db and a document db using cdc streams. For this purpose we would like to use 
redis for hosting the materialized view. Can we do this in streaming SQL? We 
have worked through https://github.com/ververica/flink-sql-CDC and can see how 
this will work with ES as a sink. But can we use redis as the sink? Where do we 
find the syntax for that?

Thanks,
Osada.





This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.




This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.



Re: Error while fetching data from Apache Kafka

2021-09-14 Thread Dhiru
 Thanks @caizhi , let me look this 
On Monday, September 13, 2021, 10:24:03 PM EDT, Caizhi Weng 
 wrote:  
 
 Hi!
This seems to be caused by some mismatching types in your source definition and 
your workflow. If possible could you describe the schema of your Kafka source 
and paste your datastream / Table / SQL code here?
Dhiru  于2021年9月14日周二 上午3:49写道:

I am not sure when we try to receive data from Apache Kafka I get this error , 
but works good for me when I try to run via Conflunece kafka 
java.lang.ClassCastException: class java.lang.String cannot be cast to class 
scala.Product (java.lang.String is in module java.base of loader 'bootstrap'; 
scala.Product is in unnamed module of loader 'app')
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:96)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

  

Fast serialization for Kotlin data classes

2021-09-14 Thread Alex Cruise
Hi there,

I appreciate the fact that Flink has built-in support for making POJO and
Scala `case class` serialization faster, but in my project we use immutable
Kotlin `data class`es (analogous to Scala `case class`es) extensively, and
we'd really prefer not to make them POJOs, mostly for style/taste reasons
(e.g. need a default constructor and setters, both are anathema!)

Does anyone know of a good way for us to keep using idiomatic, immutable
Kotlin data classes, but to get much faster serialization performance in
Flink?

Thanks!

-0xe1a


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thanks David. What you are saying makes sense. But, I keep hearing I
shouldn't delete the topic externally, and I keep asking why doesn't Flink
forget about the topic IF it has in fact been deleted externally (for
whatever reason).

I think I will drop this now.

On Tue, Sep 14, 2021 at 5:50 PM David Morávek  wrote:

> We are basically describing the same thing with Fabian, just a different
> wording.
>
> The problem is that if you delete the topic externally, you're making an
> assumption that downstream processor (Flink in this case) has already
> consumed and RELIABLY processed all of the data from that topic (which may
> not be true). This would effectively lead to AT_MOST_ONCE delivery
> guarantees (in other words, we are OK with loosing data), which is a
> trade-off that _in_my_opinion_ we shouldn't make here.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thank you for the replies, they are much appreciated.
>>
>> I'm sure I'm missing something obvious here, so bear with me...
>>
>> Fabian, regarding:
>>
>> "Flink will try to recover from the previous checkpoint which is invalid
>> by now because the partition is not available anymore."
>>
>> The above would happen because the partition is not available anymore in
>> Kafka (right?), and not because Flink's partition discoverer has removed it
>> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
>> in Kafka anymore, so that's the source of the problem in the scenario you
>> outlined). In other words, what would be the *extra* harm from Flink
>> cleaning up the partition from its cache after it knows that the partition
>> is gone - this is the part I still don't understand.
>>
>> David, similarly:
>>
>> "actual topic deletion would need to be performed by Flink (not by the
>> 3rd party system as suggested in the original question)"
>>
>> The situation is that the topic has, for better or worse, already been
>> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
>> to continue remembering the partition of an already-deleted topic? (the
>> checkpoint is invalid regardless, right?)
>>
>>
>>
>> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:
>>
>>> On 9/14/21 3:57 PM, David Morávek wrote:
>>>
>>> Hi Jan,
>>>
>>> Notion of completeness is just one part of the problem. The second part
>>> is that once you remove the Kafka topic, you are no longer able to replay
>>> the data in case of failure.
>>>
>>> So you basically need a following workflow to ensure correctness:
>>>
>>> 1) Wait until there are no more elements in the topic (this can be done
>>> by checking watermark for that partition as you're suggesting)
>>> 2) Take a checkpoint N
>>> 3) Delete the topic (this effectively makes all the checkpoints < N
>>> invalid)
>>>
>>> Agree.
>>>
>>>
>>> If you switch order of 2) and 3) you have no way to recover from failure.
>>>
>>> Also for this to work properly, actual topic deletion would need to be
>>> performed by Flink (not by the 3rd party system as suggested in the
>>> original question) in the second phase of 2PC (when you're sure that you've
>>> successfully taken a checkpoint, that has seen all the data).
>>>
>>> Agree, the deletion would have to be preceded by something like
>>> partition drain. What is needed is the watermark reaching end of global
>>> window (+inf) and a checkpoint. After that, the source can be removed and
>>> what happens with it is no concern any more. That applies to all sources in
>>> general. I don't know the implementation details, but it seems that the
>>> topic would have to be somehow marked as "draining", it would then be the
>>> responsibility of the reader to shift the watermark belonging to partitions
>>> of that topic to +inf. It would then be responsibility of Flink to verify
>>> that such source is removed only after a checkpoint is taken. Otherwise
>>> there would be possible risk of data loss.
>>>
>>> This definitely looks like quite complex process.
>>>
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>>>
 Hi,

 just out of curiosity, would this problem be solvable by the ability to
 remove partitions, that declare, that do not contain more data
 (watermark reaching end of global window)? There is probably another
 problem with that topic can be recreated after being deleted, which
 could result in watermark moving back in time, but this problem might
 be
 there already.

   Jan

 On 9/14/21 3:08 PM, Fabian Paul wrote:
 > Hi Constantinos,
 >
 > I agree with David that it is not easily possible to remove a
 partition while a Flink job is running. Imagine the following scenario:
 >
 > Your Flink job initially works on 2 partitions belonging to two
 different topics and you have checkpointing enabled to guarantee
 > exactly-once delivery. It implies that on every checkpoin

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- user@flink.apache.org
- d...@flink.apache.org
- u...@beam.apache.org
- d...@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to u...@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> at
> org.apache.beam.runners.core.Sim

Re: RocksDB state not cleaned up

2021-09-14 Thread tao xiao
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where
we have too many levels, like fast growing state,  as compaction doesn't
clean up high level SST files in time, Is this correct? If yes should we
register a timer with TTL time and manual clean up the state (state.clear()
) when the timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify
this

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:

> Hi Tao,
>
> my intuition is that the compaction of SST files is not triggering. By
> default, it's only triggered by the size ratios of different levels [1] and
> the TTL mechanism has no effect on it.
>
> Some reasoning from Stephan:
>
> It's very likely to have large files in higher levels that haven't been
>> compacted in a long time and thus just stay around.
>>
>> This might be especially possible if you insert a lot in the beginning
>> (build up many levels) and then have a moderate rate of modifications, so
>> the changes and expiration keep happening purely in the merges /
>> compactions of the first levels. Then the later levels may stay unchanged
>> for quite some time.
>>
>
> You should be able to see compaction details by setting RocksDB logging to
> INFO [2]. Can you please check these and validate whether this really is
> the case?
>
> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
> [2]
> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>
> Best,
> D.
>
> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>
>> Hi team
>>
>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>> The TTL update type is OnCreateAndWrite. We set the value state when the
>> value state doesn't exist and we never update it again after the state is
>> not empty. The key of the value state is timestamp. My understanding of
>> such TTL settings is that the size of all SST files remains flat (let's
>> disregard the impact space amplification brings) after 1 day as the daily
>> data volume is more or less the same. However the RocksDB native metrics
>> show that the SST files continue to grow since I started the job. I check
>> the SST files in local storage and I can see SST files with age 1 months
>> ago (when I started the job). What is the possible reason for the SST files
>> not cleaned up?.
>>
>> The Flink version is 1.12.1
>> State backend is RocksDB with incremental checkpoint
>> All default configuration for RocksDB
>> Per job mode in Yarn and checkpoint to S3
>>
>>
>> Here is the code to set value state
>>
>> public void open(Configuration parameters) {
>> StateTtlConfig ttlConfigClick = StateTtlConfig
>> .newBuilder(Time.days(1))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> 
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupInRocksdbCompactFilter(300_000)
>> .build();
>> ValueStateDescriptor clickStateDescriptor = new 
>> ValueStateDescriptor<>("click", Click.class);
>> clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>> clickState = getRuntimeContext().getState(clickStateDescriptor);
>>
>> StateTtlConfig ttlConfigAds = StateTtlConfig
>> .newBuilder(Time.days(1))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> 
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupInRocksdbCompactFilter(30_000_000)
>> .build();
>> ValueStateDescriptor adsStateDescriptor = new 
>> ValueStateDescriptor<>("ads", slimAdsClass);
>> adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>> adsState = getRuntimeContext().getState(adsStateDescriptor);
>> }
>>
>> @Override
>> public void processElement(Tuple3 tuple, Context ctx, 
>> Collector collector) throws Exception {
>> if (tuple.f1 != null) {
>> Click click = tuple.f1;
>>
>> if (clickState.value() != null) {
>> return;
>> }
>>
>> clickState.update(click);
>>
>> A adsFromState = adsState.value();
>> if (adsFromState != null) {
>> collector.collect(adsFromState);
>> }
>> } else {
>> A ads = tuple.f2;
>>
>> if (adsState.value() != null) {
>> return;
>> }
>>
>> adsState.update(ads);
>>
>> Click clickFromState = clickState.value();
>> if (clickFromState != null) {
>> collector.collect(ads);
>> }
>> }
>> }
>>
>>
>> Here is the snippet of sst files in local storage
>>
>> [root@ db]# ll | head -n10
>> total 76040068
>> -rw-r- 1 hadoop yarn0 Aug 16 08:46 03.log
>> -rw-r- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>> -rw-r- 1 hadoop yarn 

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Jan Lukavský

Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so 
this might create a high pressure on state backend and/or heap, which 
could result in suboptimal performance. Due to the "connection loss" and 
timeout exceptions you describe I'd suppose there might be a lot of GC 
pressure.


 Jan

On 9/14/21 5:20 PM, Kathula, Sandeep wrote:


Hi,

We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). 
We have a fixed window of 5 minutes after conversion to 
PCollection and then writing to S3. We have around 320 
columns in our data. Our intention is to write large files of size 
128MB or more so that it won’t have a small file problem when reading 
back from Hive. But from what we observed it is taking too much memory 
to write to S3 (giving memory of 8GB to heap is not enough to write 50 
MB files and it is going OOM). When I increase memory for heap to 32GB 
then it take lot of time to write records to s3.


For instance it takes:

20 MB file - 30 sec

50 MB file - 1 min 16 sec

75 MB file - 2 min 15 sec

83 MB file - 2 min 40 sec

Code block to write to S3:

PCollection parquetRecord = …….

parquetRecord.apply(FileIO./write/()
    .via(ParquetIO./sink/(getOutput_schema()))
    .to(outputPath.isEmpty() ? outputPath() : outputPath)
    .withNumShards(5)
    .withNaming(new CustomFileNaming("snappy.parquet")));

We are also getting different exceptions like:

 1. *UserCodeException*:

**

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator


at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)


at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)


at java.lang.Iterable.forEach(Iterable.java:75)

at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.col

Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Kathula, Sandeep
Hi,
   We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a 
fixed window of 5 minutes after conversion to PCollection and 
then writing to S3. We have around 320 columns in our data. Our intention is to 
write large files of size 128MB or more so that it won’t have a small file 
problem when reading back from Hive. But from what we observed it is taking too 
much memory to write to S3 (giving memory of 8GB to heap is not enough to write 
50 MB files and it is going OOM). When I increase memory for heap to 32GB then 
it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath.isEmpty() ? outputPath() : outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStr

Re: flink : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/LoggerContextShutdownAware

2021-09-14 Thread Ragini Manjaiah
hi David,
yes . you are correct. solved the issue

On Tue, Sep 14, 2021 at 5:57 PM David Morávek  wrote:

> From the stacktrace you've shared in the previous email, it seems that
> you're running the code from IDE, is that correct?
>
> This is the part that makes me assume that, because it's touching files
> from local maven repository.
>
> SLF4J: Found binding in
> [jar:file:/Users/z004t01/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> Please note that IDE most likely doesn't use the resulting fat jar for
> running your program, but instead constructs the classpath from the
> dependency graph. This most likely comes a transitive dependency from one
> of the hadoop deps, so you can try to exclude it there directly. You can
> use mvn dependency:tree to verify the exclusion.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 2:15 PM Ragini Manjaiah 
> wrote:
>
>> Hi David,
>> please find my pom.xml . where I have excluded the slf4j-log4j12
>> dependency . even after excluding encountering this issue
>>
>> 
>> http://maven.apache.org/POM/4.0.0";
>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>> 4.0.0
>>
>> flinkTest
>> *
>> 1.0-SNAPSHOT
>>
>>
>> 
>> 1.11.3
>> 2.11
>> 
>> 
>>
>> 
>> org.apache.flink
>> flink-connector-elasticsearch7_2.11
>> 1.10.0
>> 
>>
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>>
>> 
>> org.apache.flink
>> flink-streaming-java_2.11
>> ${flink.version}
>> 
>>
>> 
>> org.apache.flink
>> 
>> flink-statebackend-rocksdb_${scala.version}
>> ${flink.version}
>> 
>>
>> 
>> org.apache.flink
>> flink-clients_${scala.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-core
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-avro
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> 
>> flink-connector-kafka-0.11_${scala.version}
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-test-utils_${scala.version}
>> ${flink.version}
>> 
>> 
>> nl.basjes.parse.useragent
>> yauaa
>> 1.3
>> 
>>
>> 
>> com.googlecode.json-simple
>> json-simple
>> 1.1
>> 
>> 
>> de.javakaffee
>> kryo-serializers
>> 0.38
>> 
>> 
>> com.github.wnameless
>> json-flattener
>> 0.5.0
>> 
>> 
>> joda-time
>> joda-time
>> 2.9.1
>> 
>> 
>> com.google.code.gson
>> gson
>> 2.2.4
>> 
>> 
>> org.json
>> json
>> 20200518
>>
>> 
>> 
>> org.apache.hadoop
>> hadoop-common
>> 3.2.0
>> 
>> 
>> org.apache.hadoop
>> hadoop-mapreduce-client-core
>> 3.2.0
>> 
>>
>>
>> 
>> 
>> 
>> spring-repo
>> https://repo1.maven.org/maven2/
>> 
>> 
>>
>>
>> 
>> 
>>
>> 
>> org.apache.maven.plugins
>> maven-compiler-plugin
>> 3.1
>> 
>> 1.8
>> 1.8
>> 
>> 
>>
>> 
>>
>> 
>> 
>> 
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 3.0.0
>> 
>> 
>> 
>> package
>> 
>> shade
>> 
>> 
>> 
>> 
>> 
>> org.apache.flink:force-shading
>> 
>> com.google.code.findbugs:jsr305
>> org.slf4j:*
>> log4j:*
>> org.slf4j:slf4j-api
>> org.slf4j:slf4j-log4j12
>> log4j:log4j
>>
>> 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
We are basically describing the same thing with Fabian, just a different
wording.

The problem is that if you delete the topic externally, you're making an
assumption that downstream processor (Flink in this case) has already
consumed and RELIABLY processed all of the data from that topic (which may
not be true). This would effectively lead to AT_MOST_ONCE delivery
guarantees (in other words, we are OK with loosing data), which is a
trade-off that _in_my_opinion_ we shouldn't make here.

Best,
D.

On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos 
wrote:

> Hi all,
>
> Thank you for the replies, they are much appreciated.
>
> I'm sure I'm missing something obvious here, so bear with me...
>
> Fabian, regarding:
>
> "Flink will try to recover from the previous checkpoint which is invalid
> by now because the partition is not available anymore."
>
> The above would happen because the partition is not available anymore in
> Kafka (right?), and not because Flink's partition discoverer has removed it
> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
> in Kafka anymore, so that's the source of the problem in the scenario you
> outlined). In other words, what would be the *extra* harm from Flink
> cleaning up the partition from its cache after it knows that the partition
> is gone - this is the part I still don't understand.
>
> David, similarly:
>
> "actual topic deletion would need to be performed by Flink (not by the 3rd
> party system as suggested in the original question)"
>
> The situation is that the topic has, for better or worse, already been
> deleted. So my question is one of cleanup, i.e. how is it useful for Flink
> to continue remembering the partition of an already-deleted topic? (the
> checkpoint is invalid regardless, right?)
>
>
>
> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:
>
>> On 9/14/21 3:57 PM, David Morávek wrote:
>>
>> Hi Jan,
>>
>> Notion of completeness is just one part of the problem. The second part
>> is that once you remove the Kafka topic, you are no longer able to replay
>> the data in case of failure.
>>
>> So you basically need a following workflow to ensure correctness:
>>
>> 1) Wait until there are no more elements in the topic (this can be done
>> by checking watermark for that partition as you're suggesting)
>> 2) Take a checkpoint N
>> 3) Delete the topic (this effectively makes all the checkpoints < N
>> invalid)
>>
>> Agree.
>>
>>
>> If you switch order of 2) and 3) you have no way to recover from failure.
>>
>> Also for this to work properly, actual topic deletion would need to be
>> performed by Flink (not by the 3rd party system as suggested in the
>> original question) in the second phase of 2PC (when you're sure that you've
>> successfully taken a checkpoint, that has seen all the data).
>>
>> Agree, the deletion would have to be preceded by something like partition
>> drain. What is needed is the watermark reaching end of global window (+inf)
>> and a checkpoint. After that, the source can be removed and what happens
>> with it is no concern any more. That applies to all sources in general. I
>> don't know the implementation details, but it seems that the topic would
>> have to be somehow marked as "draining", it would then be the
>> responsibility of the reader to shift the watermark belonging to partitions
>> of that topic to +inf. It would then be responsibility of Flink to verify
>> that such source is removed only after a checkpoint is taken. Otherwise
>> there would be possible risk of data loss.
>>
>> This definitely looks like quite complex process.
>>
>>
>> Best,
>> D.
>>
>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> just out of curiosity, would this problem be solvable by the ability to
>>> remove partitions, that declare, that do not contain more data
>>> (watermark reaching end of global window)? There is probably another
>>> problem with that topic can be recreated after being deleted, which
>>> could result in watermark moving back in time, but this problem might be
>>> there already.
>>>
>>>   Jan
>>>
>>> On 9/14/21 3:08 PM, Fabian Paul wrote:
>>> > Hi Constantinos,
>>> >
>>> > I agree with David that it is not easily possible to remove a
>>> partition while a Flink job is running. Imagine the following scenario:
>>> >
>>> > Your Flink job initially works on 2 partitions belonging to two
>>> different topics and you have checkpointing enabled to guarantee
>>> > exactly-once delivery. It implies that on every checkpoint the offsets
>>> of the Kafka topic are stored in a Flink checkpoint to recover
>>> > from them in case of a failure.
>>> > Now you trigger the removal of one of the topics and the discovery
>>> detects that one of the partitions was removed. If the pipeline
>>> > now fails before the next checkpoint was taken Flink will try to
>>> recover from the previous checkpoint which is invalid by now because
>>> > the partition is not available anymore.
>>> >
>>> > Only if

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Hi all,

Thank you for the replies, they are much appreciated.

I'm sure I'm missing something obvious here, so bear with me...

Fabian, regarding:

"Flink will try to recover from the previous checkpoint which is invalid by
now because the partition is not available anymore."

The above would happen because the partition is not available anymore in
Kafka (right?), and not because Flink's partition discoverer has removed it
from its cache (i.e. even if Flink leaves it there, the topic doesn't exist
in Kafka anymore, so that's the source of the problem in the scenario you
outlined). In other words, what would be the *extra* harm from Flink
cleaning up the partition from its cache after it knows that the partition
is gone - this is the part I still don't understand.

David, similarly:

"actual topic deletion would need to be performed by Flink (not by the 3rd
party system as suggested in the original question)"

The situation is that the topic has, for better or worse, already been
deleted. So my question is one of cleanup, i.e. how is it useful for Flink
to continue remembering the partition of an already-deleted topic? (the
checkpoint is invalid regardless, right?)



On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:

> On 9/14/21 3:57 PM, David Morávek wrote:
>
> Hi Jan,
>
> Notion of completeness is just one part of the problem. The second part is
> that once you remove the Kafka topic, you are no longer able to replay the
> data in case of failure.
>
> So you basically need a following workflow to ensure correctness:
>
> 1) Wait until there are no more elements in the topic (this can be done by
> checking watermark for that partition as you're suggesting)
> 2) Take a checkpoint N
> 3) Delete the topic (this effectively makes all the checkpoints < N
> invalid)
>
> Agree.
>
>
> If you switch order of 2) and 3) you have no way to recover from failure.
>
> Also for this to work properly, actual topic deletion would need to be
> performed by Flink (not by the 3rd party system as suggested in the
> original question) in the second phase of 2PC (when you're sure that you've
> successfully taken a checkpoint, that has seen all the data).
>
> Agree, the deletion would have to be preceded by something like partition
> drain. What is needed is the watermark reaching end of global window (+inf)
> and a checkpoint. After that, the source can be removed and what happens
> with it is no concern any more. That applies to all sources in general. I
> don't know the implementation details, but it seems that the topic would
> have to be somehow marked as "draining", it would then be the
> responsibility of the reader to shift the watermark belonging to partitions
> of that topic to +inf. It would then be responsibility of Flink to verify
> that such source is removed only after a checkpoint is taken. Otherwise
> there would be possible risk of data loss.
>
> This definitely looks like quite complex process.
>
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:
>
>> Hi,
>>
>> just out of curiosity, would this problem be solvable by the ability to
>> remove partitions, that declare, that do not contain more data
>> (watermark reaching end of global window)? There is probably another
>> problem with that topic can be recreated after being deleted, which
>> could result in watermark moving back in time, but this problem might be
>> there already.
>>
>>   Jan
>>
>> On 9/14/21 3:08 PM, Fabian Paul wrote:
>> > Hi Constantinos,
>> >
>> > I agree with David that it is not easily possible to remove a partition
>> while a Flink job is running. Imagine the following scenario:
>> >
>> > Your Flink job initially works on 2 partitions belonging to two
>> different topics and you have checkpointing enabled to guarantee
>> > exactly-once delivery. It implies that on every checkpoint the offsets
>> of the Kafka topic are stored in a Flink checkpoint to recover
>> > from them in case of a failure.
>> > Now you trigger the removal of one of the topics and the discovery
>> detects that one of the partitions was removed. If the pipeline
>> > now fails before the next checkpoint was taken Flink will try to
>> recover from the previous checkpoint which is invalid by now because
>> > the partition is not available anymore.
>> >
>> > Only if you do not care about loosing data it is possible to simply
>> ignore the removed partition.
>> >
>> > Best,
>> > Fabian
>>
>


Re: Streaming SQL support for redis streaming connector

2021-09-14 Thread David Morávek
Hi Osada,

in theory building a Redis table from "CDC stream" should definitely be
doable. Unfortunately Flink currently doesn't have any official Redis Sink
for the Table API and there is currently no on-going effort for adding it,
so it would need to be implemented first. The resulting syntax would be
pretty much the same to what's outlined in the mentioned example.

Best,
D.

On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage <
osada.paranaliyan...@dialog.lk> wrote:

> Hi All, We are looking to use flink to build a materialized view of a
> relation db and a document db using cdc streams. For this purpose we would
> like to use redis for hosting the materialized view. Can we do this in
> streaming SQL? We have worked through
> https://github.com/ververica/flink-sql-CDC and can see how this will work
> with ES as a sink. But can we use redis as the sink? Where do we find the
> syntax for that?
>
>
>
> Thanks,
>
> Osada.
>
>
>
> --
>
>
> This e-mail is confidential. It may also be legally privileged. If you are
> not the intended recipient or have received it in error, please delete it
> and all copies from your system and notify the sender immediately by return
> e-mail. Any unauthorized reading, reproducing, printing or further
> dissemination of this e-mail or its contents is strictly prohibited and may
> be unlawful. Internet communications cannot be guaranteed to be timely,
> secure, error or virus-free. The sender does not accept liability for any
> errors or omissions.
>
>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

On 9/14/21 3:57 PM, David Morávek wrote:

Hi Jan,

Notion of completeness is just one part of the problem. The second 
part is that once you remove the Kafka topic, you are no longer able 
to replay the data in case of failure.


So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be 
done by checking watermark for that partition as you're suggesting)

2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N 
invalid)

Agree.


If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be 
performed by Flink (not by the 3rd party system as suggested in the 
original question) in the second phase of 2PC (when you're sure that 
you've successfully taken a checkpoint, that has seen all the data).


Agree, the deletion would have to be preceded by something like 
partition drain. What is needed is the watermark reaching end of global 
window (+inf) and a checkpoint. After that, the source can be removed 
and what happens with it is no concern any more. That applies to all 
sources in general. I don't know the implementation details, but it 
seems that the topic would have to be somehow marked as "draining", it 
would then be the responsibility of the reader to shift the watermark 
belonging to partitions of that topic to +inf. It would then be 
responsibility of Flink to verify that such source is removed only after 
a checkpoint is taken. Otherwise there would be possible risk of data loss.


This definitely looks like quite complex process.



Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský > wrote:


Hi,

just out of curiosity, would this problem be solvable by the
ability to
remove partitions, that declare, that do not contain more data
(watermark reaching end of global window)? There is probably another
problem with that topic can be recreated after being deleted, which
could result in watermark moving back in time, but this problem
might be
there already.

  Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:
> Hi Constantinos,
>
> I agree with David that it is not easily possible to remove a
partition while a Flink job is running. Imagine the following
scenario:
>
> Your Flink job initially works on 2 partitions belonging to two
different topics and you have checkpointing enabled to guarantee
> exactly-once delivery. It implies that on every checkpoint the
offsets of the Kafka topic are stored in a Flink checkpoint to recover
> from them in case of a failure.
> Now you trigger the removal of one of the topics and the
discovery detects that one of the partitions was removed. If the
pipeline
> now fails before the next checkpoint was taken Flink will try to
recover from the previous checkpoint which is invalid by now because
> the partition is not available anymore.
>
> Only if you do not care about loosing data it is possible to
simply ignore the removed partition.
>
> Best,
> Fabian



Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Jan,

Notion of completeness is just one part of the problem. The second part is
that once you remove the Kafka topic, you are no longer able to replay the
data in case of failure.

So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be done by
checking watermark for that partition as you're suggesting)
2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N invalid)

If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be
performed by Flink (not by the 3rd party system as suggested in the
original question) in the second phase of 2PC (when you're sure that you've
successfully taken a checkpoint, that has seen all the data).

Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský  wrote:

> Hi,
>
> just out of curiosity, would this problem be solvable by the ability to
> remove partitions, that declare, that do not contain more data
> (watermark reaching end of global window)? There is probably another
> problem with that topic can be recreated after being deleted, which
> could result in watermark moving back in time, but this problem might be
> there already.
>
>   Jan
>
> On 9/14/21 3:08 PM, Fabian Paul wrote:
> > Hi Constantinos,
> >
> > I agree with David that it is not easily possible to remove a partition
> while a Flink job is running. Imagine the following scenario:
> >
> > Your Flink job initially works on 2 partitions belonging to two
> different topics and you have checkpointing enabled to guarantee
> > exactly-once delivery. It implies that on every checkpoint the offsets
> of the Kafka topic are stored in a Flink checkpoint to recover
> > from them in case of a failure.
> > Now you trigger the removal of one of the topics and the discovery
> detects that one of the partitions was removed. If the pipeline
> > now fails before the next checkpoint was taken Flink will try to recover
> from the previous checkpoint which is invalid by now because
> > the partition is not available anymore.
> >
> > Only if you do not care about loosing data it is possible to simply
> ignore the removed partition.
> >
> > Best,
> > Fabian
>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

Hi,

just out of curiosity, would this problem be solvable by the ability to 
remove partitions, that declare, that do not contain more data 
(watermark reaching end of global window)? There is probably another 
problem with that topic can be recreated after being deleted, which 
could result in watermark moving back in time, but this problem might be 
there already.


 Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:

Hi Constantinos,

I agree with David that it is not easily possible to remove a partition while a 
Flink job is running. Imagine the following scenario:

Your Flink job initially works on 2 partitions belonging to two different 
topics and you have checkpointing enabled to guarantee
exactly-once delivery. It implies that on every checkpoint the offsets of the 
Kafka topic are stored in a Flink checkpoint to recover
from them in case of a failure.
Now you trigger the removal of one of the topics and the discovery detects that 
one of the partitions was removed. If the pipeline
now fails before the next checkpoint was taken Flink will try to recover from 
the previous checkpoint which is invalid by now because
the partition is not available anymore.

Only if you do not care about loosing data it is possible to simply ignore the 
removed partition.

Best,
Fabian


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Fabian Paul
Hi Constantinos,

I agree with David that it is not easily possible to remove a partition while a 
Flink job is running. Imagine the following scenario:

Your Flink job initially works on 2 partitions belonging to two different 
topics and you have checkpointing enabled to guarantee
exactly-once delivery. It implies that on every checkpoint the offsets of the 
Kafka topic are stored in a Flink checkpoint to recover 
from them in case of a failure.
Now you trigger the removal of one of the topics and the discovery detects that 
one of the partitions was removed. If the pipeline
now fails before the next checkpoint was taken Flink will try to recover from 
the previous checkpoint which is invalid by now because
the partition is not available anymore. 

Only if you do not care about loosing data it is possible to simply ignore the 
removed partition.

Best,
Fabian

Streaming SQL support for redis streaming connector

2021-09-14 Thread Osada Paranaliyanage
Hi All, We are looking to use flink to build a materialized view of a relation 
db and a document db using cdc streams. For this purpose we would like to use 
redis for hosting the materialized view. Can we do this in streaming SQL? We 
have worked through https://github.com/ververica/flink-sql-CDC and can see how 
this will work with ES as a sink. But can we use redis as the sink? Where do we 
find the syntax for that?

Thanks,
Osada.





This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.



Re: flink : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/LoggerContextShutdownAware

2021-09-14 Thread David Morávek
>From the stacktrace you've shared in the previous email, it seems that
you're running the code from IDE, is that correct?

This is the part that makes me assume that, because it's touching files
from local maven repository.

SLF4J: Found binding in
[jar:file:/Users/z004t01/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

Please note that IDE most likely doesn't use the resulting fat jar for
running your program, but instead constructs the classpath from the
dependency graph. This most likely comes a transitive dependency from one
of the hadoop deps, so you can try to exclude it there directly. You can
use mvn dependency:tree to verify the exclusion.

Best,
D.

On Tue, Sep 14, 2021 at 2:15 PM Ragini Manjaiah 
wrote:

> Hi David,
> please find my pom.xml . where I have excluded the slf4j-log4j12
> dependency . even after excluding encountering this issue
>
> 
> http://maven.apache.org/POM/4.0.0";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 4.0.0
>
> flinkTest
> *
> 1.0-SNAPSHOT
>
>
> 
> 1.11.3
> 2.11
> 
> 
>
> 
> org.apache.flink
> flink-connector-elasticsearch7_2.11
> 1.10.0
> 
>
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${flink.version}
> 
>
> 
> org.apache.flink
> 
> flink-statebackend-rocksdb_${scala.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-clients_${scala.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-core
> ${flink.version}
> 
> 
> org.apache.flink
> flink-avro
> ${flink.version}
> 
> 
> org.apache.flink
> 
> flink-connector-kafka-0.11_${scala.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-test-utils_${scala.version}
> ${flink.version}
> 
> 
> nl.basjes.parse.useragent
> yauaa
> 1.3
> 
>
> 
> com.googlecode.json-simple
> json-simple
> 1.1
> 
> 
> de.javakaffee
> kryo-serializers
> 0.38
> 
> 
> com.github.wnameless
> json-flattener
> 0.5.0
> 
> 
> joda-time
> joda-time
> 2.9.1
> 
> 
> com.google.code.gson
> gson
> 2.2.4
> 
> 
> org.json
> json
> 20200518
>
> 
> 
> org.apache.hadoop
> hadoop-common
> 3.2.0
> 
> 
> org.apache.hadoop
> hadoop-mapreduce-client-core
> 3.2.0
> 
>
>
> 
> 
> 
> spring-repo
> https://repo1.maven.org/maven2/
> 
> 
>
>
> 
> 
>
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
> 1.8
> 1.8
> 
> 
>
> 
>
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 3.0.0
> 
> 
> 
> package
> 
> shade
> 
> 
> 
> 
> 
> org.apache.flink:force-shading
> 
> com.google.code.findbugs:jsr305
> org.slf4j:*
> log4j:*
> org.slf4j:slf4j-api
> org.slf4j:slf4j-log4j12
> log4j:log4j
>
> 
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
>

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thank you for your answer David, which is a confirmation of what we see in
the Flink code.

A few thoughts below:


"as this may easily lead to a data loss"

Removing a topic/partition can indeed lead to data loss if not done
carefully. However, *after* the topic has been deleted, I believe it would
be safe for the partition discoverer to forget it, unless I am missing
something.


"partition removal is not even supported by Kafka"

Topic removal is supported, however, and that is our use case here as well.


"You should only be adding new topics / partitions"

Unfortunately, in some cloud environments, partitions are a precious
commodity and it is often unaffordable to scale them up without
subsequently scaling them down.


In my humble view, forgetting a topic's partitions after the topic's
removal should be supported by the partition discoverer (even if it's an
opt-in capability). Would the Flink community be open to a contribution
that does this?


Best regards,

Constantinos Papadopoulos

On Tue, Sep 14, 2021 at 12:54 PM David Morávek  wrote:

> Hi Constantinos,
>
> The partition discovery doesn't support topic / partition removal as this
> may easily lead to a data loss (partition removal is not even supported by
> Kafka for the same reason)
>
> Dynamically adding and removing partitions as part of a business logic is
> just not how Kafka is designed to work. You should only be adding new
> topics / partitions for scale out reasons and even that should be done
> super carefully because it breaks data partitioning.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
>> name *pattern*, and we have partition discovery enabled.
>>
>> When our product scales up, it adds new topics. When it scales down, it
>> removes topics.
>>
>> The problem is that the FlinkKafkaConsumer never seems to forget
>> partitions that don't exist anymore.
>>
>> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>>
>> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
>> groupId=metric-processor-consumer-group] Error while fetching metadata with
>> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>>
>> Over time, the problem becomes worse as scale ups and scale downs
>> continue happening (and thus the number of deleted partitions continues
>> increasing).
>>
>> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
>> forget partitions that don't exist anymore?
>>
>> The deleted topics are not returned by the "listTopics" API which the
>> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
>> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
>> and their partitions.
>>
>


Re: flink : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/LoggerContextShutdownAware

2021-09-14 Thread Ragini Manjaiah
Hi David,
please find my pom.xml . where I have excluded the slf4j-log4j12 dependency
. even after excluding encountering this issue


http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

flinkTest
*
1.0-SNAPSHOT



1.11.3
2.11




org.apache.flink
flink-connector-elasticsearch7_2.11
1.10.0



org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-java
${flink.version}



org.apache.flink
flink-streaming-java_2.11
${flink.version}



org.apache.flink
flink-statebackend-rocksdb_${scala.version}
${flink.version}



org.apache.flink
flink-clients_${scala.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-avro
${flink.version}


org.apache.flink
flink-connector-kafka-0.11_${scala.version}
${flink.version}


org.apache.flink
flink-test-utils_${scala.version}
${flink.version}


nl.basjes.parse.useragent
yauaa
1.3



com.googlecode.json-simple
json-simple
1.1


de.javakaffee
kryo-serializers
0.38


com.github.wnameless
json-flattener
0.5.0


joda-time
joda-time
2.9.1


com.google.code.gson
gson
2.2.4


org.json
json
20200518



org.apache.hadoop
hadoop-common
3.2.0


org.apache.hadoop
hadoop-mapreduce-client-core
3.2.0






spring-repo
https://repo1.maven.org/maven2/








org.apache.maven.plugins
maven-compiler-plugin
3.1

1.8
1.8








org.apache.maven.plugins
maven-shade-plugin
3.0.0



package

shade





org.apache.flink:force-shading

com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*
org.slf4j:slf4j-api
org.slf4j:slf4j-log4j12
log4j:log4j






*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA






org.sapphire.watchtower.Application













maven-surefire-plugin
2.12.3


maven-failsafe-plugin
2.12.3




org.eclipse.m2e
lifecycle-mapping
1.0.0






org.apache.maven.plugins

maven-shade-plugin
[3.0.0,)

shade


 

Re: flink : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/LoggerContextShutdownAware

2021-09-14 Thread David Morávek
Hi Ragini,

I think you actually have the opposite problem that your classpath contains
slf4j binding for log4j 1.2, which is no longer supported. Can you try
getting rid of the slf4j-log4j12 dependency?

Best,
D.

On Tue, Sep 14, 2021 at 1:51 PM Ragini Manjaiah 
wrote:

> when I try to run flink .1.13 application encountering the below mentioned
> issue. what dependency I am missing . can you please help me
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/z004t01/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/z004t01/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at
> org.apache.logging.log4j.core.impl.Log4jContextFactory.createContextSelector(Log4jContextFactory.java:106)
> at
> org.apache.logging.log4j.core.impl.Log4jContextFactory.(Log4jContextFactory.java:59)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at org.apache.logging.log4j.LogManager.(LogManager.java:94)
> at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:122)
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45)
> at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:46)
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
> at
> org.apache.flink.configuration.Configuration.(Configuration.java:67)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1972)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1958)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1945)
> at org.sapphire.watchtower.Application.main(Application.java:63)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.logging.log4j.spi.LoggerContextShutdownAware
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 32 more
>


flink : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/LoggerContextShutdownAware

2021-09-14 Thread Ragini Manjaiah
when I try to run flink .1.13 application encountering the below mentioned
issue. what dependency I am missing . can you please help me


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/z004t01/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/z004t01/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
org.apache.logging.log4j.core.impl.Log4jContextFactory.createContextSelector(Log4jContextFactory.java:106)
at
org.apache.logging.log4j.core.impl.Log4jContextFactory.(Log4jContextFactory.java:59)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at org.apache.logging.log4j.LogManager.(LogManager.java:94)
at
org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:122)
at
org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45)
at
org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:46)
at
org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
at
org.apache.flink.configuration.Configuration.(Configuration.java:67)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1972)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1958)
at java.util.Optional.orElseGet(Optional.java:267)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1945)
at org.sapphire.watchtower.Application.main(Application.java:63)
Caused by: java.lang.ClassNotFoundException:
org.apache.logging.log4j.spi.LoggerContextShutdownAware
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 32 more


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Constantinos,

The partition discovery doesn't support topic / partition removal as this
may easily lead to a data loss (partition removal is not even supported by
Kafka for the same reason)

Dynamically adding and removing partitions as part of a business logic is
just not how Kafka is designed to work. You should only be adding new
topics / partitions for scale out reasons and even that should be done
super carefully because it breaks data partitioning.

Best,
D.

On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos 
wrote:

> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
> name *pattern*, and we have partition discovery enabled.
>
> When our product scales up, it adds new topics. When it scales down, it
> removes topics.
>
> The problem is that the FlinkKafkaConsumer never seems to forget
> partitions that don't exist anymore.
>
> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>
> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
> groupId=metric-processor-consumer-group] Error while fetching metadata with
> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>
> Over time, the problem becomes worse as scale ups and scale downs continue
> happening (and thus the number of deleted partitions continues increasing).
>
> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
> forget partitions that don't exist anymore?
>
> The deleted topics are not returned by the "listTopics" API which the
> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
> and their partitions.
>


Re: RocksDB state not cleaned up

2021-09-14 Thread David Morávek
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By
default, it's only triggered by the size ratios of different levels [1] and
the TTL mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been
> compacted in a long time and thus just stay around.
>
> This might be especially possible if you insert a lot in the beginning
> (build up many levels) and then have a moderate rate of modifications, so
> the changes and expiration keep happening purely in the merges /
> compactions of the first levels. Then the later levels may stay unchanged
> for quite some time.
>

You should be able to see compaction details by setting RocksDB logging to
INFO [2]. Can you please check these and validate whether this really is
the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2]
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:

> Hi team
>
> We have a job that uses value state with RocksDB and TTL set to 1 day. The
> TTL update type is OnCreateAndWrite. We set the value state when the value
> state doesn't exist and we never update it again after the state is not
> empty. The key of the value state is timestamp. My understanding of such
> TTL settings is that the size of all SST files remains flat (let's
> disregard the impact space amplification brings) after 1 day as the daily
> data volume is more or less the same. However the RocksDB native metrics
> show that the SST files continue to grow since I started the job. I check
> the SST files in local storage and I can see SST files with age 1 months
> ago (when I started the job). What is the possible reason for the SST files
> not cleaned up?.
>
> The Flink version is 1.12.1
> State backend is RocksDB with incremental checkpoint
> All default configuration for RocksDB
> Per job mode in Yarn and checkpoint to S3
>
>
> Here is the code to set value state
>
> public void open(Configuration parameters) {
> StateTtlConfig ttlConfigClick = StateTtlConfig
> .newBuilder(Time.days(1))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupInRocksdbCompactFilter(300_000)
> .build();
> ValueStateDescriptor clickStateDescriptor = new 
> ValueStateDescriptor<>("click", Click.class);
> clickStateDescriptor.enableTimeToLive(ttlConfigClick);
> clickState = getRuntimeContext().getState(clickStateDescriptor);
>
> StateTtlConfig ttlConfigAds = StateTtlConfig
> .newBuilder(Time.days(1))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupInRocksdbCompactFilter(30_000_000)
> .build();
> ValueStateDescriptor adsStateDescriptor = new 
> ValueStateDescriptor<>("ads", slimAdsClass);
> adsStateDescriptor.enableTimeToLive(ttlConfigAds);
> adsState = getRuntimeContext().getState(adsStateDescriptor);
> }
>
> @Override
> public void processElement(Tuple3 tuple, Context ctx, 
> Collector collector) throws Exception {
> if (tuple.f1 != null) {
> Click click = tuple.f1;
>
> if (clickState.value() != null) {
> return;
> }
>
> clickState.update(click);
>
> A adsFromState = adsState.value();
> if (adsFromState != null) {
> collector.collect(adsFromState);
> }
> } else {
> A ads = tuple.f2;
>
> if (adsState.value() != null) {
> return;
> }
>
> adsState.update(ads);
>
> Click clickFromState = clickState.value();
> if (clickFromState != null) {
> collector.collect(ads);
> }
> }
> }
>
>
> Here is the snippet of sst files in local storage
>
> [root@ db]# ll | head -n10
> total 76040068
> -rw-r- 1 hadoop yarn0 Aug 16 08:46 03.log
> -rw-r- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
> -rw-r- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
> -rw-r- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
> -rw-r- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
> -rw-r- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
> -rw-r- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
> -rw-r- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
> -rw-r- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
> --
> Regards,
> Tao
>


Re: FLINK-14316 happens on version 1.13.2

2021-09-14 Thread Xiangyu Su
Hi Guys,
sorry for the late reply.
we found out the issue is not related to flink, there is a connection issue
with zookeeper. we deploy our whole infra on k8s and using aws spot ec2,
once the pod
 get restarted or lost spot instances we lost the log files... so sorry for
not being able to share the log files.

Sharing some of our experiences:
Job leader lost leadership issues can be caused due to different reasons,
most properly due to zookeeper and this failure does not cause job failure
as far as we have seen.
And the Checkpoint timeout issue can also be due to zookeeper issue,
because the last successful CK meta info is stored in ZK, if ZK has issue
flink will not be able to restore from last CK..

On Tue, 7 Sept 2021 at 10:00, Matthias Pohl  wrote:

> Hi Xiangyu,
> thanks for reaching out to the community. Could you share the entire
> TaskManager and JobManager logs with us? That might help investigating
> what's going on.
>
> Best,
> Matthias
>
> On Fri, Sep 3, 2021 at 10:07 AM Xiangyu Su  wrote:
>
>> Hi Yun,
>> Thanks alot.
>> I am running a test, and facing the "Job Leader lost leadership..."
>> issue, and also the checkpointing timeout at the same time,, not sure
>> whether those 2 things related to each other.
>> regarding your question:
>> 1. GC looks ok.
>> 2. seems like once the "Job Leader lost leadership..." happens flink job
>> can not successfully get restarted.
>> and e.g here is some logs from one job failure:
>> ---
>> 2021-09-02 20:41:11,345 WARN  org.apache.flink.runtime.taskmanager.Task
>>  [] - KeyedProcess -> Sink: StatsdMetricsSink (40/48)#18
>> (9ab62cc148569e449fdb31b521ec976c) switched from RUNNING to FAILED with
>> failure cause: org.apache.flink.util.FlinkException: Disconnect from
>> JobManager responsible for ec6fd88643747aafac06ee906e421a96.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1660)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1500(TaskExecutor.java:181)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2189)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2187)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.Exception: Job leader for job id
>> ec6fd88643747aafac06ee906e421a96 lost leadership.
>> ... 24 more
>>
>> ---
>> 2021-09-02 20:47:22,388 ERROR
>> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
>> Authentication failed
>> 2021-09-02 20:47:22,388 INFO
>>  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
>> Opening socket connection to server dpl-zookeeper-0.dpl-zookeeper/
>> 10.168.175.10:2181
>> 2021-09-02 20:47:22,388 WARN
>>  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
>> SASL configuration failed: javax.security.auth.login.LoginException: No
>> JAAS configuration section named 'Client' was found in specified JAAS
>> configuration file: '/tmp/jaas-4480663428736118963.conf'. Will continue
>> connection to Zookeeper server without SASL authentication, if Zookeeper
>> server allows it.
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.13.2.jar:1.13.2]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink

Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
name *pattern*, and we have partition discovery enabled.

When our product scales up, it adds new topics. When it scales down, it
removes topics.

The problem is that the FlinkKafkaConsumer never seems to forget partitions
that don't exist anymore.

As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:

*[Consumer clientId=consumer-metric-processor-consumer-group-2,
groupId=metric-processor-consumer-group] Error while fetching metadata with
correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*

Over time, the problem becomes worse as scale ups and scale downs continue
happening (and thus the number of deleted partitions continues increasing).

Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
forget partitions that don't exist anymore?

The deleted topics are not returned by the "listTopics" API which the
KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
and their partitions.


Re: Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-14 Thread Guowei Ma
Hi

Maybe you could try the `kubectl describe pod -n ${namespace} ${podname}`
to see what happened atm.

Best,
Guowei


On Tue, Sep 14, 2021 at 2:58 PM bat man  wrote:

> Hello Guowei,
>
> The pods terminate almost within a second so am unable to pull any logs.
> Is there any way I can pull the logs?
>
> Thanks,
> Hemant
>
> On Tue, Sep 14, 2021 at 12:22 PM Guowei Ma  wrote:
>
>> Hi,
>>
>> Could you share some logs when the job fails?
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Sep 13, 2021 at 10:59 PM bat man  wrote:
>>
>>> Hi,
>>>
>>> I am running a POC to evaluate Flink on Native Kubernetes. I tried
>>> changing the default log location by using the configuration -
>>> kubernetes.flink.log.dir
>>> However, the job in application mode fails after bringing up the task
>>> manager. This is the command I use -
>>>
>>> ./bin/flink run-application --target kubernetes-application
>>> -Dkubernetes.cluster-id=flink-k8s-poc-app
>>> -Dkubernetes.container.image=
>>> -Dkubernetes.flink.log.dir="/var/log/containers"
>>> local:///opt/flink/usrlib/uber.jar
>>>
>>> Is there something else which needs to be done to write logs to a
>>> different location like creating the folders in the custom image.
>>>
>>> Thanks.
>>>
>>


Re: JVM Metaspace capacity planning

2021-09-14 Thread Guowei Ma
Hi, Puneet
In general every job  has its own classloader. You could find more detailed
information from doc [1].
You could put some common jar into the "/lib" to avoid this [2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code

Best,
Guowei


On Mon, Sep 13, 2021 at 10:06 PM Puneet Duggal 
wrote:

> Hi,
>
> Thank you for quick reply. So in my case i am using Datastream Apis.Each
> job is a real time processing engine which consumes data from kafka and
> performs some processing on top of it before ingesting into sink.
>
> JVM Metaspace size earlier set was around 256MB (default) which i had to
> increase to 3GB so that ~30 parallel jobs can run simultaneously on single
> task manager.
>
> Regards,
> Puneet
>
> On 13-Sep-2021, at 5:46 PM, Caizhi Weng  wrote:
>
> Hi!
>
> Which API are you using? The datastream API or the Table / SQL API? If it
> is the Table / SQL API then some Java classes for some operators (for
> example aggregations, projection, filter, etc.) will be generated when
> compiling user code to executable Java code. These Java classes are new to
> the JVM. So if you're running too many jobs in the same Flink cluster a
> metaspace OOM might occur. There is already a JIRA ticket for this [1].
>
> I don't know much about the behavior of class loaders, so I'll wait for
> others to apply in this aspect.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15024
>
> Puneet Duggal  于2021年9月13日周一 下午7:49写道:
>
>> Hi,
>>
>> So on going through multiple resources, got basic idea that JVM Metaspace
>> is used by flink class loader to load class metadata which is used to
>> create objects in heap. Also this is a one time activity since all the
>> objects of single class require single class metadata object in JVM
>> Metaspace.
>>
>> But while deploying multiple jobs on task manager, i saw almost linear
>> increase in consumption of metaspace (irrespective of parallelism). Even if
>> those multiple jobs have exactly same implementation. So wanted to confirm
>> if each job in flink has its own class loader which loads required classes
>> in Task Manager JVM Metaspace.
>>
>> PS: Any documentation for this will be of great help.
>>
>> Thanks,
>> Puneet
>
>
>


Re: Flink-Zookeeper Security

2021-09-14 Thread David Morávek
Hi Beata,

you need to upgrade zookeeper to 3.5+ on Flink side [1] and set JVM
properties the same way you'd do with vanilla ZK client. Please refer to
the following thread [2] for more details. It would be great if you could
provide feedback for future reference, whether this approach has worked for
you.

[1] https://issues.apache.org/jira/browse/ZOOKEEPER-2125
[2]
https://lists.apache.org/x/thread.html/ra416ad4d5d27e4dfafe1bafe420027f01ea25c84a0036f852f13bf5d@%3Cuser.flink.apache.org%3E

Best,
D.

On Mon, Sep 13, 2021 at 1:21 PM Beata Szymanowska 
wrote:

> Hi!
>I struggling with finding the answer for the question if this is
> possible to connect Fink to Zookeeper cluster secured with TLS certificate?
>
> All the Best,
> Beata Sz.
>