How to put external data in EmbeddedRocksDB

2022-01-28 Thread Surendra Lalwani
Hi Team,

I am working on a solution where we need to perform some lookup from flink
job, earlier we were using Redis and calling that redis using UDF from
Flink Job but now we are planning to remove external dependency from Flink
of Redis and want to use the embedded rocks db as look up store, does
anyone of you have any idea on how this can be done or if anybody has any
other solution, that is also appreciable.

Thanks and Regards ,
Surendra Lalwani

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Re: Log4j2 Issues

2022-01-28 Thread Clayton Wohl
sorry. nevermind. I see the Flink operator overrides with JVM argument
configuration.

JVM Options:
   -Dlog.file=/opt/flink/log/flink--client-4b57ba2b2597.log
   -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
   -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties
   -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

that all makes sense. thank you.

On Fri, Jan 28, 2022 at 4:02 PM Clayton Wohl  wrote:

> When I run my Flink app via the Google Flink Operator, log4j2 logging
> doesn't show up in logs. System.out.println messages work.
>
> Everything should be very plain-vanilla-standard setup. I have a
> log4j2.xml config file in my classpath included in my application .jar
> file. I'm building a custom Docker image based on
> `flink:1.14.3-scala_2.12-java11` that simply copies my application .jar and
> all transitive dependencies to `/opt/flink/lib`. My app is built with the
> same `org.apache.logging.log4j:log4j-core:2.17.1` dependency that Flink
> 1.14.3 uses.
>
> If I shell into my docker image and run `java -cp "/opt/flink/lib/*"
> myapp.LoggingTest`, the logging works, I see it on the console. When I
> submit this via the Google Flink Kubernetes Operator, I don't see the
> logging output, but I do see println output.
>
> If I change my app to use logback, I see that output, which is weird. How
> do I troubleshoot the log4j2 option? Or should I just use logback if that
> works? For a long running Flink job, I just want to see startup+config
> logs + exception logs.
>
> thank you :)
>


Log4j2 Issues

2022-01-28 Thread Clayton Wohl
When I run my Flink app via the Google Flink Operator, log4j2 logging
doesn't show up in logs. System.out.println messages work.

Everything should be very plain-vanilla-standard setup. I have a log4j2.xml
config file in my classpath included in my application .jar file. I'm
building a custom Docker image based on `flink:1.14.3-scala_2.12-java11`
that simply copies my application .jar and all transitive dependencies to
`/opt/flink/lib`. My app is built with the same
`org.apache.logging.log4j:log4j-core:2.17.1` dependency that Flink 1.14.3
uses.

If I shell into my docker image and run `java -cp "/opt/flink/lib/*"
myapp.LoggingTest`, the logging works, I see it on the console. When I
submit this via the Google Flink Kubernetes Operator, I don't see the
logging output, but I do see println output.

If I change my app to use logback, I see that output, which is weird. How
do I troubleshoot the log4j2 option? Or should I just use logback if that
works? For a long running Flink job, I just want to see startup+config
logs + exception logs.

thank you :)


EOS from checkpoints doesn't seem to work

2022-01-28 Thread Cristian Constantinescu
Hi everyone,

>From the mailing list, I see this question asked a lot. But I can't seem to
find a solution to my problem. I would appreciate some help.

The requirement for our project is that we do not lose data, and not
produce duplicate records. Our pipelines are written with Apache Beam
(2.35.0) and run on a single Flink (1.13.1) node (for now, while we
transition to Kubernetes).

We read a topic from Kafka, join it with another topic, and output the
result in a third topic. When we introduce an exception artificially into
the pipeline (by listening to a debug topic that throws an exception on
every message it gets), we observe that restarting the pipeline from the
last checkpoint does not pick up where it left off. I'm not really sure
why...

On the Beam side, the pipeline is configured with
.withReadCommited().commitOffesetsInFinalize() and enable.auto.commit is
set to false for the consumer, and with .withEOS(1, "sink-group-name"). On
the Flink side, --externilizeCheckpointsEnabled is set to true, and
--checkpointInterval is set to 1minute.

I let the pipeline run for 4 checkpoints. Between checkpoint #2 and #3, I
observe that the kafka consumer group of the main topic which started from
the start of the topic has already reached the end. I trigger the exception
between checkpoint #4 and #5 and the pipeline stops because
--numberOfExecutionRetries=2. When I restart the pipeline and specify the
metadata file in the chk-4 directory, I would expect the pipeline to
continue processing the items still pending to be processed (estimated to
about 80k of them) after checkpoint #4. Unfortunately, no item is
processed. Nothing is read from kafka. The pipeline just sits around
waiting for new messages in the main topic.

Could anyone help me figure out what's going on? I'm sure it's a user
mistake, but I'm unsure how to debug it.

Cheers,
Cristian


Re: Is Scala the best language for Flink?

2022-01-28 Thread sri hari kali charan Tummala
Yes Scala is the best.

On Fri, Jan 28, 2022, 9:57 AM Nicolás Ferrario 
wrote:

> Hi Seb.
>
> In my team we are migrating things to Kotlin because we find it much
> easier to deal with. It's like the best of both worlds, but you do give up
> on Flink Scala serializers, since the only way to get Kotlin Data Classes
> working is by making them a POJO (or implementing your own TypeInfo). You
> do at least gain Evolution support.
>
> We've found that Kotlin + Gradle are a great combination, and we end up
> with quick compile times, a much simpler build config (compared to Maven
> and SBT).
> Another thing is that we've found that the macro createTypeInformation can
> be dangerous, because sometimes it may fallback to Kryo on classes you
> don't expect it to. Those implicit behaviors we see in Scala have caused us
> some headaches and we prefer to explicitly type things in the Data Stream.
> Just personal preference, but pointing it out in case it's useful to
> someone.
>
> Hope this helps!
>
> On Mon, Jan 24, 2022 at 7:15 AM seb  wrote:
>
>> Hi there,
>>
>> I am getting started with Apache Flink. I am curious whether there is a
>> clear winner between developing in either Scala or Java.
>>
>> It sounds like Flink is typically slower to support new versions of Scala
>> and that Java development might have fewer quirks.
>>
>> What do you think? I have experience coding in Scala, but I am more than
>> happy to learn Java.
>>
>> Thanks in advance for sharing your thoughts!
>>
>> Best,
>> Sebastian
>>
>


Re: Is Scala the best language for Flink?

2022-01-28 Thread Nicolás Ferrario
Hi Seb.

In my team we are migrating things to Kotlin because we find it much easier
to deal with. It's like the best of both worlds, but you do give up on
Flink Scala serializers, since the only way to get Kotlin Data Classes
working is by making them a POJO (or implementing your own TypeInfo). You
do at least gain Evolution support.

We've found that Kotlin + Gradle are a great combination, and we end up
with quick compile times, a much simpler build config (compared to Maven
and SBT).
Another thing is that we've found that the macro createTypeInformation can
be dangerous, because sometimes it may fallback to Kryo on classes you
don't expect it to. Those implicit behaviors we see in Scala have caused us
some headaches and we prefer to explicitly type things in the Data Stream.
Just personal preference, but pointing it out in case it's useful to
someone.

Hope this helps!

On Mon, Jan 24, 2022 at 7:15 AM seb  wrote:

> Hi there,
>
> I am getting started with Apache Flink. I am curious whether there is a
> clear winner between developing in either Scala or Java.
>
> It sounds like Flink is typically slower to support new versions of Scala
> and that Java development might have fewer quirks.
>
> What do you think? I have experience coding in Scala, but I am more than
> happy to learn Java.
>
> Thanks in advance for sharing your thoughts!
>
> Best,
> Sebastian
>


MAP data type (PyFlink)

2022-01-28 Thread Philippe Rigaux
Hello

I want to send and receive dict Python values. According to the PyFlink doc, 
this is specified with Types.MAP(). Unfortunately I 
found no example of the required arguments,  and I am stuck with the following 
error:

TypeError: MAP() missing 2 required positional arguments: 'key_type_info' and 
'value_type_info'

How should I specify for instance the type for {‘url’: ‘’, ‘count’: 2} ?

Thanks for your help.

Philippe



Re: Unbounded streaming with table API and large json as one of the columns

2022-01-28 Thread HG
Thanks

On Fri, Jan 28, 2022, 07:47 Caizhi Weng  wrote:

> Hi!
>
> This job will work as long as your SQL statement is valid. Did you meet
> some difficulties? Or what is your concern? A record of 100K is sort of
> large, but I've seen quite a lot of jobs with such record size so it is OK.
>
> HG  于2022年1月27日周四 02:57写道:
>
>> Hi,
>>
>> I need to calculate elapsed times between steps of a transaction.
>> Each step is an event. All steps belonging to a single transaction have
>> the same transaction id. Every event has a handling time.
>> All information is part of a large JSON structure.
>> But I can have the incoming source supply transactionId and handlingTime
>> separately.
>> That would save me retrieving the windowingKey = transactionID and
>> handlingTime out of the nested JSON
>> Basically I want to use the SQL api to do:
>>
>> select transactionId
>>, handlingTime - previousHandlingTime as elapsedTime
>>, largeJSON from (
>>   select  transactionId
>>   , handlingTime
>>   , lag(handlingTime) over (partition by transactionID order by
>> handlingTime)  as previousHandlingTime
>>   , largeJSON
>>   from source
>> )
>>
>> The largeJSON can be about 100K.
>> Would this work?
>>
>> Regards Hans-Peter
>>
>>


Flink test late elements

2022-01-28 Thread Dario Heinisch

Hey there,

Hope everyone is well!

I have a question:
```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    DataStream dataStream = env.addSource(new CustomSource());

    OutputTag outputTag = new OutputTag("late") {};

    WatermarkStrategy waStrat = WatermarkStrategy
    .forMonotonousTimestamps()
    .withTimestampAssigner((i, timestamp) -> Long.valueOf(i));

    SingleOutputStreamOperator windowOperator =
    dataStream
    .assignTimestampsAndWatermarks(waStrat)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
    .sideOutputLateData(outputTag)
    .apply(
    new AllWindowFunctionTimeWindow>() {

    @Override
    public void apply(TimeWindow 
window, Iterable values, Collector out) {

System.out.println(window.getStart() + " -> " + window.getEnd());
    for (Integer val : values) {
    System.out.println(val + " 
in window ");

    }
    }
    });

    windowOperator
    .getSideOutput(outputTag)
    .flatMap(
    new FlatMapFunction() {
    @Override
    public void flatMap(Integer value, 
Collector out) {

    System.out.println("LATE: " + value);
    }
    });

    env.execute();
```

And my custom source:
```
static class CustomSource implements SourceFunction {

    @Override
    public void run(SourceContext ctx) throws Exception {
    int i = 0;
    while (i++ < 10) {
    Thread.sleep(100);
    if (i < 5) {
    ctx.collect(i * 10);
    } else {
    ctx.collect(i);
    }

    }
    }

    @Override
    public void cancel() {

    }
    }
```

Now in the source if I leave `Thread.sleep(100)` in there I get the 
following output:

```
10 -> 11
10 in window
20 -> 21
20 in window
LATE: 5
30 -> 31
30 in window
LATE: 6
LATE: 7
LATE: 8
LATE: 9
LATE: 10
40 -> 41
40 in window
```

If I comment out the sleep, I get:
```
5 -> 6
5 in window
6 -> 7
6 in window
7 -> 8
7 in window
8 -> 9
8 in window
9 -> 10
9 in window
10 -> 11
10 in window
10 in window
20 -> 21
20 in window
30 -> 31
30 in window
40 -> 41
40 in window
```

I thought by using `forMonotonousTimestamps` it would automatically drop 
the late events from the window and emit them to the side output since 
the first 4 elements will have a higher timestamp than the last 5 but it 
seems
it evaluates all elements emitted by the source at once and then emit 
the watermark after one batch of events by the source has been processed.


I assume there would be a better approach then to sleep in between 
collecting in the source for getting the expected result?


( The background here is that I want to write tests for some event time 
processing and assert that the SideOutput will be used for late events 
and these are being
processed correctly. In production the Kafka source will not always emit 
data continuously, hence I want to test this but I have not found 
anything better than sleeping, maybe that is the correct approach?)


Best regards,

Dario



Re: Resolving a CatalogTable

2022-01-28 Thread Timo Walther

Hi Balazs,

you are right, the new APIs only allow the serialization of resolved 
instances. This ensures that only validated, correct instances are put 
into the persistent storage such as a database. The framework will 
always provide resolved instances and call the corresponding methods 
with those. They should be easily serializable.


However, when reading from a persistent storage such as a database, the 
framework needs to validate the input and resolved expressions and data 
types (e.g. from a string representation).


The new design reflects the reality better. A catalog implementation 
does not need to be symmetric. It follows the principle:


- "Resolved" into the catalog (with all information if implementers need it)
- "Unresolved" out of the catalog (let the framework deal with the 
resolution, also with cross references to other catalogs)



Use ResolvedCatalogTable#toProperties for putting basic info into your 
database.


Use CatalogTable#fromProperties to restore the table.

This is esp important for expression resolution of computed columns and 
watermark strategies. Functions could come from other catalogs as well.


So for implementers it is usally not important to resolved the 
`CatalogTable` manually.


If it is important for you, maybe you can elaborate a bit on your use case?

Regards,
Timo


On 26.01.22 12:18, Balázs Varga wrote:

Hi everyone,

I'm trying to migrate from the old set of CatalogTable related APIs 
(CatalogTableImpl, TableSchema, DescriptorProperties) to the new ones 
(CatalogBaseTable, Schema and ResolvedSchema, CatalogPropertiesUtil), in 
a custom catalog.


The catalog stores table definitions, and the current logic involves 
persisting the
schema from a CatalogBaseTable to a database. When we get a table, its 
definition is read from the database and the CatalogTable is built up 
and returned.


For this, we currently serialize the schema like this:
descriptorProperties.putTableSchema(Schema.SCHEMA, 
catalogBaseTable.getSchema());


The new API seems to intentionally only allow the serialization of the 
Resolved version of objects (e.g. ResolvedCatalogTable, ResolvedSchema).


1. Could you please clarify why this limitation was put into place? It 
seems to me that it would
be sufficient to resolve the CatalogTables once we are actually trying 
to pass the table to the DynamicTableFactory.


2. What additional information is gained during the resolution of a 
CatalogTable, and where does that information come from? Are there some 
references to things in other catalogs?


3. Is it possible to "manually" resolve a CatalogTable? (invoke 
something like what the internal DefaultSchemaResolver does). What 
context is required?


Thanks,
Balazs





Socket stream source in Python?

2022-01-28 Thread Philippe Rigaux
Hi there

I would like to use a socket stream as input for my Flink workflow in Python. 
This works in scala with the socketTextStream() method, for instance

val stream = senv.socketTextStream("localhost", 9000, '\n')

I cannot find an equivalent in PyFlink, although it is briefly mentioned in the 
documentation. 

Any help is much appreciated.

Philippe



Re: Example for Jackson JsonNode Kafka serialization schema

2022-01-28 Thread Robert Metzger
Hi Oran,

as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, ...


On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster 
wrote:

> In the documentation we have an example on how to implement
> deserialization from bytes to Jackson ObjectNode objects
> - JSONKeyValueDeserializationSchema
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
>
> However, there is no example on the other direction: Taking an
> ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
> it to string
>
> You can write a simple schema like so
>
>
> public class JSONKafkaSerializationSchema implements
> KafkaSerializationSchema {
> private final ObjectMapper objectMapper = new ObjectMapper();
>
> @Override
> public ProducerRecord serialize(JsonNode element,
> @Nullable Long timestamp) {
> String topic = getTargetTopic(element);
>
> byte[] value;
>
> try {
> value = objectMapper.writeValueAsBytes(element);
> return new ProducerRecord<>(topic, value);
> } catch (JsonProcessingException e) {
> return null;
> }
> }
>
> private String getTargetTopic(JsonNode jsonNode) {
> return jsonNode.get("topic").asText();
> }
> }
>
> But this raises a question - What to do when a serialization fails?
> if the input class is a simple POJO then Jackson should always succeed in
> converting to bytes but that's not 100% guaranteed.
> In case of failures, can we return null and the record will be discarded?
> Null values are discarded in the case of the deserialization schema, from
> the documentation - "Returns: The deserialized message as an object (null
> if the message cannot be deserialized)."
> If this is not possible, what is the proper way to serialize Jackson
> objets into bytes in flink? Its possible to convert everything to String
> before the kafka producer but then any logic to determine the topic we need
> to send to will need to deserialize the string again
>


Re: Upgrade to 1.14.3

2022-01-28 Thread Robert Metzger
Hi Sweta,

yes, you can not run a Flink job compiled against Flink 1.13. against a
1.14 cluster. But if you are only using stable APIs of Flink, you should be
able to compile your job with the 1.14 dependencies without touching the
code.

See also:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/upgrading/

> - How do I upgrade to 1.14.3 cluster without loosing running apps state?
I have even tried doing savepoint that did not revive the job.

Using a savepoint created with a job compiled against Flink 1.13, and
restoring it with a job compiled against 1.14 is the recommended approach.


On Tue, Jan 25, 2022 at 3:37 PM Sweta Kalakuntla 
wrote:

> Hi Ingo,
>
> So basically, I cannot deploy an older version of flink job in 1.14.3
> flink cluster, is it?
>
> Thanks,
> Sweta
>
> On Tue, Jan 25, 2022 at 4:02 AM Ingo Bürk  wrote:
>
>> Hi Sweta,
>>
>> there was a non-compatible change to SourceReaderContext#metricGroup in
>> the 1.14.x release line; I assume this is what you are seeing.
>>
>> Did you make sure to update the connector (and any other) dependencies
>> as well?
>>
>>
>> Best
>> Ingo
>>
>> On 25.01.22 05:36, Sweta Kalakuntla wrote:
>> > Hi,
>> >
>> > We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
>> > version. I see that job(on 1.13.3) is unable to start up because it
>> says
>> > it couldn't find metrics group(inside flinkkafkaconsumer class).
>> >
>> > - can I deploy 1.13.3 job on 1.14.3 cluster?
>> > - can I deploy 1.14.3 job on 1.13.3 cluster?
>> > - How do I upgrade to 1.14.3 cluster without loosing running apps
>> state?
>> > I have even tried doing savepoint that did not revive the job.
>> >
>> > Thank you,
>> > Sweta
>>
>


Re: IllegalArgumentException: URI is not hierarchical error when initializating jobmanager in cluster

2022-01-28 Thread Robert Metzger
Hi Javier,

I suspect that TwitterServer is using some classloading / dependency
injection / service loading "magic" that is causing this.
I would try to find out, either by attaching a remote debugger (should be
possible when executing in cluster mode locally) or by adding log
statements in the code, what the URI it's trying to load looks like.

On the cluster, Flink is using separate classloaders for the base flink
system, and the user code (as opposed to executing in the IDE, where
everything is loaded from the same loader). Check out this page and try out
the config arguments:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/



On Wed, Jan 26, 2022 at 4:13 AM Javier Vegas  wrote:

> I am porting a Scala service to Flink in order to make it more scalable
> via running it in a cluster. All my Scala services extends a base Service
> class that extends TwitterServer (
> https://github.com/twitter/twitter-server/blob/develop/server/src/main/scala/com/twitter/server/TwitterServer.scala)
> and that base class contains a lot of logic about resource initialization,
> logging, stats and error handling, monitoring, etc that I want to keep
> using in my class. I ported my logic to Flink sources and sinks, and
> everything worked fine when I ran my class in single JVM mode either from
> sbt or my IDE, Flink's jobmanager and taskmanagers start and run my app.
> But when I try to run my application in cluster mode, when launching my
> class with "./bin/standalone-job.sh start --job-classname" the jobmanager
> runs into a "IllegalArgumentException: URI is not hierarchical" error on
> initialization, apparently because TwitterServer is trying to load
> something from the class path (see attached full log).
>
> Is there anything I can do to run a class that extends TwitterServer in a
> Flink cluster? I have tried making my class not extend it and it worked
> fine, but I really want to keep using all the common infraestructure logic
> that I have in my base class that extends TwitterServer.
>
> Thanks!
>


Re: Duplicate job submission error

2022-01-28 Thread Robert Metzger
Hi Parag,

it seems that you are submitting a job with the same job id multiple times.
An easy fix would be generating a new job id each time you are submitting
the job.

To debug this: check out the Flink jobmanager logs, there are log messages
for every job submission.


On Thu, Jan 27, 2022 at 9:16 AM Parag Somani  wrote:

> Hello All,
>
> While deploying on our one of environment, we encountered crashloopback of
> job manager pod.
> Env: K8s
> Flink: 1.14.2
>
> Could you suggest, how can we troubleshoot this and possible handling of
> this?
>
>
> exception snipper as follows:
>
> 2022-01-27 06:58:07.326 ERROR 44 --- [lt-dispatcher-4]
> c.b.a.his.service.FlinkExecutorService   : Failed to execute job
>
> org.apache.flink.util.FlinkException: Failed to execute job 'events rates
> calculation'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
> ~[flink-streaming-java_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
> ~[flink-clients_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> ~[flink-clients_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
> ~[flink-streaming-java_2.12-1.14.2.jar:1.14.2]
> at
> com.bmc.ade.his.service.FlinkExecutorService.init(FlinkExecutorService.java:37)
> ~[health-service-1.0.00.jar:1.0.00]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[na:na]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:na]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:na]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> ~[na:na]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
> ~[spring-context-5.3.10.jar:5.3.10]
> at
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
> ~[spring-context-5.3.10.jar:5.3.10]
> at
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
> ~[spring-boot-2.5.5.jar:2.5.5]
> at
> org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
> ~[spring-boot-2.5.5.jar:2.5.5]
> at
> 

Re: Inaccurate checkpoint trigger time

2022-01-28 Thread Robert Metzger
Hi Paul,

where are you storing your checkpoints, and what's their size?

IIRC, Flink won't trigger a new checkpoint before the old ones haven't been
cleaned up, and if your checkpoints are large and stored on S3, it can take
a while to clean them up (especially with the Hadoop S3 plugin, using
presto s3 is faster).




On Thu, Jan 27, 2022 at 10:56 AM Paul Lam  wrote:

> Hi Yun,
>
> Sorry for the late reply. I finally found some time to investigate this
> problem further. I upgraded the job to 1.14.0, but it’s still the same.
>
> I’ve checked the debug logs, and I found that Zookeeper notifies watched
> event of checkpoint id changes very late [1]. Each time a checkpoint
> finished, it would take minutes before the Zookeeper client notices the
> checkpoint ID is changed.
>
> I suspect the checkpoint coordinator is blocking on incrementing
> checkpoint ID on Zookeeper [2]. But with no luck, there’s no many relevant
> logs can help me prove that.
>
> What do you think of this? Thanks a lot!
>
> [1] https://gist.github.com/link3280/5072a054a43b40ba28891837a8fdf995
> [2]
> https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L743
>
> Best,
> Paul Lam
>
> 2021年11月23日 16:49,Paul Lam  写道:
>
> Hi Yun,
>
> Thanks a lot for your pointers! I’ll try it out as you suggested and then
> get back to you.
>
> Best,
> Paul Lam
>
> 2021年11月23日 16:32,Yun Tang  写道:
>
> Hi Paul,
>
> This is really weird, from what I know, flink-1.11.0 has a problem of
> handling min-pause time [1] and this should be resolved in flink-1.12.1.
>
> Could you open the debug log level for org.apache.flink.runtime.checkpoint
> and use jmap or byteman to get the field value
> of CheckpointCoordinator#lastCheckpointCompletionRelativeTime, 
> CheckpointRequestDecider#minPauseBetweenCheckpoints
> and SystemClock#relativeTimeMillis in method 
> CheckpointRequestDecider#nextTriggerDelayMillis
> [2] to see any unexpected behavior.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-18856
> [2]
> https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L182
>
>
> Best
> Yun Tang
>
> --
> *From:* Paul Lam 
> *Sent:* Tuesday, November 23, 2021 14:35
> *To:* user 
> *Subject:* Inaccurate checkpoint trigger time
>
> Hi,
>
> Recently I’ve noticed a job has nondeterministic checkpoint trigger time.
>
> The jobs is using Flink 1.12.1 with FsStateBackend and is of 650
> parallelism. It was configured to trigger checkpoint every 150 seconds with
> 0 pause time and no concurrent checkpoints. However there’re obvious errors
> in the checkpoint trigger times, as the actual interval may vary from 30
> seconds to 6 minutes.
>
> The jobmanager logs are good, and no error logs is found. Some of the
> output are as follow:
>
> 2021-11-23 13:51:46,438 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1446 for job f432b8d90859db54f7a79ff29a563ee4 (47142264825 bytes
> in 22166 ms).
> 2021-11-23 13:57:21,021 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1447 (type=CHECKPOINT) @ 1637647040653 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 13:57:43,761 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1447 for job f432b8d90859db54f7a79ff29a563ee4 (46563195101 bytes
> in 21813 ms).
> 2021-11-23 13:59:09,387 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1448 (type=CHECKPOINT) @ 1637647149157 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 13:59:31,370 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1448 for job f432b8d90859db54f7a79ff29a563ee4 (45543757702 bytes
> in 20354 ms).
> 2021-11-23 14:06:37,916 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1449 (type=CHECKPOINT) @ 1637647597704 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:07:03,157 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1449 for job f432b8d90859db54f7a79ff29a563ee4 (45662471025 bytes
> in 23779 ms).
> 2021-11-23 14:07:05,838 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1450 (type=CHECKPOINT) @ 1637647625640 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:07:30,748 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1450 for job f432b8d90859db54f7a79ff29a563ee4 (46916136024 bytes
> in 22998 ms).
> 2021-11-23 14:13:09,089 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1451 (type=CHECKPOINT) @ 1637647988831 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:13:38,411 INFO
> 

Re: Determinism of interval joins

2022-01-28 Thread Robert Metzger
Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if
the behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> I'm not sure if the issue in [1] is relevant since it mentions the Table
> API, but it could be. Since stream1 and stream2 in my example have a long
> chain of operators behind, I presume they might "run" at very different
> paces.
>
> Oh and, in the context of my unit tests, watermarks should be
> deterministic, the input file is sorted, and the watermark strategies
> should essentially behave like the monotonous generator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24466
>
> Regards,
> Alexis.
>
> --
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, January 27, 2022 1:30 PM
> *To:* user@flink.apache.org 
> *Subject:* Determinism of interval joins
>
>
> Hi everyone,
>
>
>
> I’m seeing a lack of determinism in unit tests when using an interval
> join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits
> of my pipeline look a bit like this:
>
>
>
> keySelector1 = …
>
> keySelector2 = …
>
>
>
> rightStream = stream1
>
>   .flatMap(…)
>
>   .keyBy(keySelector1)
>
>   .assignTimestampsAndWatermarks(strategy1)
>
>
>
> leftStream = stream2
>
>   .keyBy(keySelector2)
>
>   .assignTimestampsAndWatermarks(strategy2)
>
>
>
> joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream,
> keySelector2)
>
>   .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream,
> keySelector1))
>
>   .between(Time.minutes(-10L), Time.milliseconds(0L))
>
>   .lowerBoundExclusive()
>
>   .process(new IntervalJoinFunction(…))
>
>
>
> ---
>
>
>
> In my tests, I have a bounded source that loads demo data from a file and
> simulates the stream with a sink that collects results in memory. In the
> specific case of my IntervalJoinFunction, I’m seeing that it’s called a
> different amount of times in a non-deterministic way, sometimes I see 14
> calls to its processElement() method, others 8, others none at all and my
> output is empty; I count this by checking my logs with some tracing.
>
>
>
> Does anyone know why this is? Maybe I’m doing something wrong,
> particularly with reinterpretAsKeyedStream.
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Stack Overflow Question - Deserialization schema for multiple topics

2022-01-28 Thread David Anderson
For questions like this one, please address them to either Stack Overflow
or the user mailing list, but not both at once. Those two forums are
appropriate places to get help with using Flink's APIs. And once you've
asked a question, please allow some days for folks to respond before trying
again.

The dev and community mailing lists are dedicated to other topics, and
aren't suitable for getting help. The community list is for discussions
related to meetups and conferences, and the dev list is for discussions and
decision making about the ongoing development of Flink itself.

In the interest of not further spamming the dev and community lists, let's
limit the follow-up on deserializers to the user ML.

Best regards,
David

On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul 
wrote:

> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer


Read Json deserialization schema from file

2022-01-28 Thread Hussein El Ghoul
Hello,

How to specify the deserialization schema for multiple Kafka topics 
using Flink (python)

I want to read from multiple Kafka topics with JSON schema using
FlinkKafkaConsumer, and I assume that I need to use
JsonRowDeserializationSchema to deserialize the data. The schema of the
topics is very large (around 1500 lines for each topic), so I want to
read it from a file instead of manually typing the types in the program.
How can I do that?

1. How to specify deserialization schema for multiple topics (3 topics)
2. How to read the JSON schema from a file?


https://stackoverflow.com/q/70892579/13067721?sem=3D2 


Thanks in advance,
Hussein
Quiqup - Data Engineer