Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert,

You can refer to
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
for the whole example.

Best,
Shuiqiang

Robert Cullen  于2021年3月13日周六 上午4:01写道:

> Shuiqiang, Can you include the import statements?  thanks.
>
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen 
> wrote:
>
>> Hi Robert,
>>
>> Kafka Connector is provided in Python DataStream API since
>> release-1.12.0. And the documentation for it is lacking, we will make it up
>> soon.
>>
>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>> ```
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>
>> # define the schema of the message from kafka, here the data is in json
>> format.
>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>> 'payPlatform', 'provinceId'],
>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>> Types.INT()])
>> json_row_schema =
>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>
>> # define the kafka connection properties.
>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>> 'pyflink-e2e-source'}
>>
>> # create the KafkaConsumer and KafkaProducer with the specified topic
>> name, serialization/deserialization schema and properties.
>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>> json_row_schema, kafka_props)
>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>> SimpleStringSchema(), kafka_props)
>>
>> # set the kafka source to consume data from earliest offset.
>> kafka_consumer.set_start_from_earliest()
>>
>> # create a DataStream from kafka consumer source
>> ds = env.add_source(kafka_consumer)
>>
>> result_stream = ...
>>
>> # write the result into kafka by a kafka producer sink.
>> result_stream.add_sink(kafka_producer)
>> ```
>>
>> Best,
>> Shuiqiang
>>
>> Robert Cullen  于2021年3月13日周六 上午12:56写道:
>>
>>> I’ve scoured the web looking for an example of using a Kafka source for
>>> a DataStream in python. Can someone finish this example?
>>>
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> ds = env.from_collection( KAFKA_SOURCE )
>>> ...
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Same error.



On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler  wrote:

> From the exception I would conclude that your core-site.xml file is not
> being picked up.
>
> AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so
> try setting HADOOP_CONF_DIR to the directory that the file resides in.
>
> On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:
>
> If anyone working have flink version 1.8.1 code reading S3 in Intellij in
> public GitHub please pass it on that will be huge help.
>
>
> Thanks
> Sri
>
> On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Which I already did in my pin still its not working.
>>
>> Thanks
>> Sri
>>
>> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler 
>> wrote:
>>
>>> The concept of plugins does not exist in 1.8.1. As a result it should be
>>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
>>> your project.
>>>
>>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>>>
>>> Let's close this issue guys please answer my questions. I am using Flink
>>> 1.8.1.
>>>
>>> Thanks
>>> Sri
>>>
>>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
 ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

 Thanks
 Sri

 On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
 kali.tumm...@gmail.com> wrote:

> I am not getting what you both are talking about lets be clear.
>
> Plugin ? what is it ? Is it a Jar which I have to download from the
> Internet and place it in a folder ? Is this the Jar which I have to
> download ? (flink-s3-fs-hadoop) ?
>
> Will this belo solution work ?
>
> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>
> Thanks
> Sri
>
>
>
> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
> wrote:
>
>> Well, you could do this before running the job:
>>
>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment
>> variable, pointing to a directory containing the plugins
>>
>> PluginManager pluginManager =
>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>> Filesystem.initialize(new Configuration(), pluginManager);
>>
>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>
>> Hi.
>>
>> I had the same problem. Flink use a plugins to access s3. When you
>> run local it starts a mini cluster and the mini cluster don’t load 
>> plugins.
>> So it’s not possible without modifying Flink.  In my case I wanted to
>> investigate save points through Flink processor API and the workaround 
>> was
>> to build my own version of the processor API and include the missing 
>> part.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>  :
>>
>> 
>> Flink,
>>
>> I am able to access Kinesis from Intellij but not S3 I have edited my
>> stack overflow question with kinesis code , Flink is still having issues
>> reading S3.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>
>>
>> Thanks
>> Sri
>>
>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> my stack overflow question.
>>>
>>>
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>
>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Here is my Intellij question.


 https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868

 On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
 kali.tumm...@gmail.com> wrote:

>
> Hi Flink Experts,
>>
>
> I am trying to read an S3 file from my Intellij using Flink I
>> am.comimg across Aws Auth error can someone help below are all the 
>> details.
>>
>
>
>> I have Aws credentials in homefolder/.aws/credentials
>>
>
> My Intellij Environment Variables:-
>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>
>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>
>> flink-conf.yaml file content:-
>>
>> fs.hdfs.hadoopconf: 
>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I validated it's still accepted by the connector but it's not in the 
documentation anymore.

It doesn't seem to help in my case.

Thanks,
Sebastian

From: Magri, Sebastian 
Sent: Friday, March 12, 2021 18:50
To: Timo Walther ; ro...@apache.org 
Cc: user 
Subject: Re: [Flink SQL] Leniency of JSON parsing

Hi Roman!

Seems like that option is no longer available.

Best Regards,
Sebastian

From: Roman Khachatryan 
Sent: Friday, March 12, 2021 16:59
To: Magri, Sebastian ; Timo Walther 

Cc: user 
Subject: Re: [Flink SQL] Leniency of JSON parsing

Hi Sebastian,

Did you try setting debezium-json-map-null-key-mode to DROP [1]?

I'm also pulling in Timo who might know better.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode

Regards,
Roman



On Fri, Mar 12, 2021 at 2:42 PM Magri, Sebastian
 wrote:
>
> I'm trying to extract data from a Debezium CDC source, in which one of the 
> backing tables has an open schema nested JSON field like this:
>
>
> "objectives": {
> "items": [
> {
> "id": 1,
> "label": "test 1"
> "size": 1000.0
> },
> {
> "id": 2,
> "label": "test 2"
> "size": 500.0
> }
> ],
> "threshold": 10.0,
> "threshold_period": "hourly",
> "max_ms": 3.0
> }
>
>
> Any of these fields can be missing at any time, and there can also be 
> additional, different fields. It is guaranteed that a field will have the 
> same data type for all occurrences.
>
> For now, I really need to get only the `threshold` and `threshold_period` 
> fields. For which I'm using a field as the following:
>
>
> CREATE TABLE probes (
>   `objectives` ROW(`threshold` FLOAT, `threshold_period` STRING)
>   ...
> ) WITH (
>  ...
>   'format' = 'debezium-json',
>   'debezium-json.schema-include' = 'true',
>   'debezium-json.ignore-parse-errors' = 'true'
> )
>
>
> However I keep getting `NULL` values in my `objectives` column, or corrupt 
> JSON message exceptions when I disable the `ignore-parse-errors` option.
>
> Does JSON parsing need to match 100% the schema of the field or is it lenient?
>
> Is there any option or syntactic detail I'm missing?
>
> Best Regards,


Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen  wrote:

> Hi Robert,
>
> Kafka Connector is provided in Python DataStream API since release-1.12.0.
> And the documentation for it is lacking, we will make it up soon.
>
> The following code shows how to apply KafkaConsumers and KafkaProducer:
> ```
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
> # define the schema of the message from kafka, here the data is in json
> format.
> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
> 'payPlatform', 'provinceId'],
> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
> Types.INT()])
> json_row_schema =
> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>
> # define the kafka connection properties.
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
> 'pyflink-e2e-source'}
>
> # create the KafkaConsumer and KafkaProducer with the specified topic
> name, serialization/deserialization schema and properties.
> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
> json_row_schema, kafka_props)
> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
> SimpleStringSchema(), kafka_props)
>
> # set the kafka source to consume data from earliest offset.
> kafka_consumer.set_start_from_earliest()
>
> # create a DataStream from kafka consumer source
> ds = env.add_source(kafka_consumer)
>
> result_stream = ...
>
> # write the result into kafka by a kafka producer sink.
> result_stream.add_sink(kafka_producer)
> ```
>
> Best,
> Shuiqiang
>
> Robert Cullen  于2021年3月13日周六 上午12:56写道:
>
>> I’ve scoured the web looking for an example of using a Kafka source for a
>> DataStream in python. Can someone finish this example?
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> ds = env.from_collection( KAFKA_SOURCE )
>> ...
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490


Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0.
And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json
format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name,
serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema,
kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer)

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen  于2021年3月13日周六 上午12:56写道:

> I’ve scoured the web looking for an example of using a Kafka source for a
> DataStream in python. Can someone finish this example?
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
>
> --
> Robert Cullen
> 240-475-4490
>


Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Roman, thank you for your attention.

It looks like you are absolutely right. Thank you very much for helping.

Before submitting a job I do next steps:
1. ./bin/start-cluster.sh
2. ./bin/taskmanager.sh start

And in my code there is these line:
env.setStateBackend(new
RocksDBStateBackend("file:///home/flink/checkpoint-data", true));

So I have a directory 'checkpoint-data' and there I can see chk-x (x=index
of checkpointing) folder. I assume it is responsible to store my states as a
full snapshot.

When I stop the app this chk-x folder is removed. So I cant recover from
that point.

I added these lines:
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
   
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

And now it's works. 

P.S.: But maybe it's better to use savepoint conceptually  (not checkpoint)

Thanks again,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
Hi Roman!

Seems like that option is no longer available.

Best Regards,
Sebastian

From: Roman Khachatryan 
Sent: Friday, March 12, 2021 16:59
To: Magri, Sebastian ; Timo Walther 

Cc: user 
Subject: Re: [Flink SQL] Leniency of JSON parsing

Hi Sebastian,

Did you try setting debezium-json-map-null-key-mode to DROP [1]?

I'm also pulling in Timo who might know better.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode

Regards,
Roman



On Fri, Mar 12, 2021 at 2:42 PM Magri, Sebastian
 wrote:
>
> I'm trying to extract data from a Debezium CDC source, in which one of the 
> backing tables has an open schema nested JSON field like this:
>
>
> "objectives": {
> "items": [
> {
> "id": 1,
> "label": "test 1"
> "size": 1000.0
> },
> {
> "id": 2,
> "label": "test 2"
> "size": 500.0
> }
> ],
> "threshold": 10.0,
> "threshold_period": "hourly",
> "max_ms": 3.0
> }
>
>
> Any of these fields can be missing at any time, and there can also be 
> additional, different fields. It is guaranteed that a field will have the 
> same data type for all occurrences.
>
> For now, I really need to get only the `threshold` and `threshold_period` 
> fields. For which I'm using a field as the following:
>
>
> CREATE TABLE probes (
>   `objectives` ROW(`threshold` FLOAT, `threshold_period` STRING)
>   ...
> ) WITH (
>  ...
>   'format' = 'debezium-json',
>   'debezium-json.schema-include' = 'true',
>   'debezium-json.ignore-parse-errors' = 'true'
> )
>
>
> However I keep getting `NULL` values in my `objectives` column, or corrupt 
> JSON message exceptions when I disable the `ignore-parse-errors` option.
>
> Does JSON parsing need to match 100% the schema of the field or is it lenient?
>
> Is there any option or syntactic detail I'm missing?
>
> Best Regards,


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Yep, makes sense.

On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan  wrote:

> > Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> Window state is cleared (as well as the window itself), but global
> state is not (unless you use TTL).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>  wrote:
> >
> > Sometimes writing it down makes you think. I now realize that this is
> not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level
> >
> >
> >
> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>
> >> >> (except that the number of keys can grow).
> >>
> >> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> >>
> >>
> >>
> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
> wrote:
> >>>
> >>> Hi Vishal,
> >>>
> >>> There is no leak in the code you provided (except that the number of
> >>> keys can grow).
> >>> But as you figured out the state is scoped to key, not to window+key.
> >>>
> >>> Could you explain what you are trying to achieve and why do you need
> to combine
> >>> sliding windows with state scoped to window+key?
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> >>>  wrote:
> >>> >
> >>> > Essentially, Does this code leak state
> >>> >
> >>> > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> >>> > extends
> >>> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> >>> > private static final long serialVersionUID = 1L;
> >>> > private final static ValueStateDescriptor sessionId = new
> ValueStateDescriptor("session_uid",
> >>> > String.class);
> >>> >
> >>> > @Override
> >>> > public void process(KEY key,
> >>> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> >>> > Iterable> elements,
> Collector> out)
> >>> > throws Exception {
> >>> > // I need this scoped to key/window
> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
> >>> > UUID uuid = UUID.randomUUID();
> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> >>> > }
> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
> >>> > out.collect(new
> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> >>> > }
> >>> > }
> >>> >
> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>> >>
> >>> >> Hello folks,
> >>> >>   The suggestion is to use windowState() for a key
> key per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>> >>
> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
> state is not allowed when using merging windows.
> >>> >>
> >>> >>
> >>> >> The questions are
> >>> >>
> >>> >> * How do I have state per session window/ per key and still be able
> to clear it ?
> >>> >> * Does getRuntime().getState() give me the clear() semantics for
> free along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>> >>
> >>> >> Regards.
> >>> >>
> >>> >>
> >>> >>
>


Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-12 Thread Roman Khachatryan
Hi Alexis,

This looks like a bug, I've created a Jira ticket to address it [1].
Please feel free to provide any additional information.

In particular, whether you are able to reproduce it in any of the
subsequent releases.

[1]
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Roman


On Thu, Mar 11, 2021 at 5:36 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
>
>
> It seems I’m having either the same problem, or a problem similar to the one 
> mentioned here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html
>
>
>
> I have a POJO class that is used in Flink state. The class is annotated with 
> @TypeInfo as described, e.g., here: 
> https://stackoverflow.com/a/64721838/5793905
>
>
>
> Now I want to remove a field from the POJO. This removal is also considered 
> in the corresponding TypeInfoFactory. However, after trying to restore from a 
> savepoint where the POJO still had the field I get this exception:
>
>
>
> 2021-03-10T20:51:30.406Z INFO  org.apache.flink.runtime.taskmanager.Task:960 
> … (6/8) (d630d5ff0d7ae4fbc428b151abebab52) switched from RUNNING to FAILED. 
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedCoProcessOperator_c535ac415eeb524d67c88f4a481077d2_(6/8) from any of the 
> 1 provided restore options.
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
> ... 6 common frames omitted
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>
> at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 8 common frames omitted
>
> Caused by: java.lang.NullPointerException: null
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
>
> at 
> org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
>
> at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.(CompositeSerializer.java:51)
>
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.(TtlStateFactory.java:250)
>
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
>
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
>
> at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
>
> at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>
> at 
> java.util.stream.AbstractPipeline.copyInto(Abstract

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
From the exception I would conclude that your core-site.xml file is not 
being picked up.


AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so 
try setting HADOOP_CONF_DIR to the directory that the file resides in.


On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:
If anyone working have flink version 1.8.1 code reading S3 in Intellij 
in public GitHub please pass it on that will be huge help.



Thanks
Sri

On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala 
mailto:kali.tumm...@gmail.com>> wrote:


Which I already did in my pin still its not working.

Thanks
Sri

On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler mailto:ches...@apache.org>> wrote:

The concept of plugins does not exist in 1.8.1. As a result it
should be sufficient for your use-case to add a dependency on
flink-s3-fs-hadoop to your project.

On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:

Let's close this issue guys please answer my questions. I am
using Flink 1.8.1.

Thanks
Sri

On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala
mailto:kali.tumm...@gmail.com>> wrote:

Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I
only see ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

Thanks
Sri

On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan
Tummala mailto:kali.tumm...@gmail.com>> wrote:

I am not getting what you both are talking about lets
be clear.

Plugin ? what is it ? Is it a Jar which I have to
download from the Internet and place it in a folder ?
Is this the Jar which I have to download ?
(flink-s3-fs-hadoop) ?

Will this belo solution work ?

https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being




Thanks
Sri



On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Well, you could do this before running the job:

// set the ConfigConstants.ENV_FLINK_PLUGINS_DIR
environment variable, pointing to a directory
containing the plugins

PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(new
Configuration());
Filesystem.initialize(new Configuration(),
pluginManager);

On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:

Hi.

I had the same problem. Flink use a plugins to
access s3. When you run local it starts a mini
cluster and the mini cluster don’t load plugins.
So it’s not possible without modifying Flink. 
In my case I wanted to investigate save points
through Flink processor API and the workaround
was to build my own version of the processor API
and include the missing part.

Med venlig hilsen / Best regards
Lasse Nedergaard



Den 10. mar. 2021 kl. 17.33 skrev sri hari kali
charan Tummala 
:


Flink,

I am able to access Kinesis from Intellij but
not S3 I have edited my stack overflow question
with kinesis code , Flink is still having
issues reading S3.


https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868




Thanks
Sri

On Tue, Mar 9, 2021 at 11:30 AM sri hari kali
charan Tummala mailto:kali.tumm...@gmail.com>> wrote:

my stack overflow question.


https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868



On Tue, Mar 9, 2021 at 11:28 AM sri hari
kali charan Tummala mailto:kali.tumm...@gmail.com>> wrote:


Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Robert Cullen
I’ve scoured the web looking for an example of using a Kafka source for a
DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...

-- 
Robert Cullen
240-475-4490


Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Aeden Jameson
Hi Matthias,

Yes, all the task managers have the same hardware/memory configuration.

Aeden

On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl  wrote:
>
> Hi Aeden,
> just to be sure: All task managers have the same hardware/memory 
> configuration, haven't they? I'm not 100% sure whether this affects the slot 
> selection in the end, but it looks like this parameter has also an influence 
> on the slot matching strategy preferring slots with less utilization of 
> resources [1].
>
> I'm gonna add Chesnay to the thread. He might have more insights here. 
> @Chesnay are there any other things that might affect the slot selection when 
> actually trying to evenly spread out the slots?
>
> Matthias
>
> [1] 
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>
> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson  
> wrote:
>>
>> Hi Arvid,
>>
>>   Thanks for responding. I did check the configuration tab of the job
>> manager and the setting cluster.evenly-spread-out-slots: true is
>> there. However I'm still observing unevenness in the distribution of
>> source tasks. Perhaps this additional information could shed light.
>>
>> Version: 1.12.1
>> Deployment Mode: Application
>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>> Flink operator https://github.com/lyft/flinkk8soperator
>>
>> I did place the setting under the flinkConfig section,
>>
>> apiVersion: flink.k8s.io/v1beta1
>> 
>> spec:
>>   flinkConfig:
>> cluster.evenly-spread-out-slots: true
>> high-availability: zookeeper
>> ...
>> state.backend: filesystem
>> ...
>>   jobManagerConfig:
>> envConfig:
>> 
>>
>> Would you explain how the setting ends up evenly distributing active
>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>> TM3 ... TM18 in order and starting again. In my case I have 36
>> partitions and 18 nodes so after the second pass in assignment I would
>> end up with 2 subtasks in the consumer group on each TM. And then
>> subsequent passes result in inactive consumers.
>>
>>
>> Thank you,
>> Aeden
>>
>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:
>> >
>> > Hi Aeden,
>> >
>> > the option that you mentioned should have actually caused your desired 
>> > behavior. Can you double-check that it's set for the job (you can look at 
>> > the config in the Flink UI to be 100% sure).
>> >
>> > Another option is to simply give all task managers 2 slots. In that way, 
>> > the scheduler can only evenly distribute.
>> >
>> > On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson  
>> > wrote:
>> >>
>> >> I have a cluster with 18 task managers 4 task slots each running a
>> >> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> >> connector. The topic being read has 36 partitions. The problem I'm
>> >> observing is that the subtasks for the sources are not evenly
>> >> distributed. For example, 1 task manager will have 4 active source
>> >> subtasks and other TM's none. Is there a way to force  each task
>> >> manager to have 2 active source subtasks.  I tried using the setting
>> >> cluster.evenly-spread-out-slots: true , but that didn't have the
>> >> desired effect.
>> >>
>> >> --
>> >> Thank you,
>> >> Aeden


Handling Bounded Sources with KafkaSource

2021-03-12 Thread Rion Williams
Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and
things have been going well. I configured my source such that it could be
used in either a streaming or bounded mode, with the bounded approach
specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the
pipeline never acknowledges the "end" of the stream in a bounded context
and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?

object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}

I can verify that the generated source has it's boundedness set properly
and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:

   1. Inject records into a Kafka Topic
   2. Initialize my Flink job using all of my testing parameters
   3. Apply my assertion (in this case verifying that a JdbcSink wrote to a
   specific database)

@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}

Where my job itself is broken down further and reads from the source,
performs a process function into multiple side outputs and writes each of
them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
   .fromSource(KafkaEventSource.withParameters(parameters),
WatermarkStrategy.noWatermarks(), "kafka")
   .process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo,
parameters))
}

stream.execute(parameters.getRequired("application"))
}

I can verify in the logs that my sink is being executed and writing to the
appropriate database, however the job itself never finishes. I've tried it
using a single Kafka partition as well as multiple partitions and even
commented out the logic related to writing to the database. It still just
seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that
isn't being used as intended?

Thanks,

Rion


Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
If anyone working have flink version 1.8.1 code reading S3 in Intellij in
public GitHub please pass it on that will be huge help.


Thanks
Sri

On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Which I already did in my pin still its not working.
>
> Thanks
> Sri
>
> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler  wrote:
>
>> The concept of plugins does not exist in 1.8.1. As a result it should be
>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
>> your project.
>>
>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>>
>> Let's close this issue guys please answer my questions. I am using Flink
>> 1.8.1.
>>
>> Thanks
>> Sri
>>
>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
>>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>>>
>>> Thanks
>>> Sri
>>>
>>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
 I am not getting what you both are talking about lets be clear.

 Plugin ? what is it ? Is it a Jar which I have to download from the
 Internet and place it in a folder ? Is this the Jar which I have to
 download ? (flink-s3-fs-hadoop) ?

 Will this belo solution work ?

 https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being

 Thanks
 Sri



 On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
 wrote:

> Well, you could do this before running the job:
>
> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
> pointing to a directory containing the plugins
>
> PluginManager pluginManager =
> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
> Filesystem.initialize(new Configuration(), pluginManager);
>
> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>
> Hi.
>
> I had the same problem. Flink use a plugins to access s3. When you run
> local it starts a mini cluster and the mini cluster don’t load plugins. So
> it’s not possible without modifying Flink.  In my case I wanted to
> investigate save points through Flink processor API and the workaround was
> to build my own version of the processor API and include the missing part.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>  :
>
> 
> Flink,
>
> I am able to access Kinesis from Intellij but not S3 I have edited my
> stack overflow question with kinesis code , Flink is still having issues
> reading S3.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>
>
> Thanks
> Sri
>
> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> my stack overflow question.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>
>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Here is my Intellij question.
>>>
>>>
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>
>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>

 Hi Flink Experts,
>

 I am trying to read an S3 file from my Intellij using Flink I
> am.comimg across Aws Auth error can someone help below are all the 
> details.
>


> I have Aws credentials in homefolder/.aws/credentials
>

 My Intellij Environment Variables:-
> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>
> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>
> flink-conf.yaml file content:-
>
> fs.hdfs.hadoopconf: 
> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>
> core-site.xml file content:-
>
>  href="configuration.xsl"?>
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem  
>   fs.s3.buffer.dir
> /tmp
> fs.s3a.server-side-encryption-algorithm
> AES256
> fs.s3a.aws.credentials.provider
> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider  
>   fs.s3a.access.key 

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Which I already did in my pin still its not working.

Thanks
Sri

On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler  wrote:

> The concept of plugins does not exist in 1.8.1. As a result it should be
> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
> your project.
>
> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>
> Let's close this issue guys please answer my questions. I am using Flink
> 1.8.1.
>
> Thanks
> Sri
>
> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>>
>> Thanks
>> Sri
>>
>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> I am not getting what you both are talking about lets be clear.
>>>
>>> Plugin ? what is it ? Is it a Jar which I have to download from the
>>> Internet and place it in a folder ? Is this the Jar which I have to
>>> download ? (flink-s3-fs-hadoop) ?
>>>
>>> Will this belo solution work ?
>>>
>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>>>
>>> Thanks
>>> Sri
>>>
>>>
>>>
>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
>>> wrote:
>>>
 Well, you could do this before running the job:

 // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
 pointing to a directory containing the plugins

 PluginManager pluginManager =
 PluginUtils.createPluginManagerFromRootFolder(new Configuration());
 Filesystem.initialize(new Configuration(), pluginManager);

 On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:

 Hi.

 I had the same problem. Flink use a plugins to access s3. When you run
 local it starts a mini cluster and the mini cluster don’t load plugins. So
 it’s not possible without modifying Flink.  In my case I wanted to
 investigate save points through Flink processor API and the workaround was
 to build my own version of the processor API and include the missing part.

 Med venlig hilsen / Best regards
 Lasse Nedergaard


 Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
  :

 
 Flink,

 I am able to access Kinesis from Intellij but not S3 I have edited my
 stack overflow question with kinesis code , Flink is still having issues
 reading S3.


 https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868


 Thanks
 Sri

 On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
 kali.tumm...@gmail.com> wrote:

> my stack overflow question.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>
> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Here is my Intellij question.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>
>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>>
>>> Hi Flink Experts,

>>>
>>> I am trying to read an S3 file from my Intellij using Flink I
 am.comimg across Aws Auth error can someone help below are all the 
 details.

>>>
>>>
 I have Aws credentials in homefolder/.aws/credentials

>>>
>>> My Intellij Environment Variables:-
 ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1

 FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config

 flink-conf.yaml file content:-

 fs.hdfs.hadoopconf: 
 /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config

 core-site.xml file content:-

 >>> href="configuration.xsl"?>
 fs.s3.impl
 org.apache.hadoop.fs.s3a.S3AFileSystem   
  fs.s3.buffer.dir
 /tmp
 fs.s3a.server-side-encryption-algorithm
 AES256
 fs.s3a.aws.credentials.provider
 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider   
  fs.s3a.access.key   
  
 fs.s3a.secret.key   
  fs.s3a.session.token
 
 fs.s3a.proxy.host   
  fs.s3a.proxy.port
 8099
 fs.s3a.proxy.username
 fs.s3a.proxy.password
 

>>>

Re: [Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Roman Khachatryan
Hi Sebastian,

Did you try setting debezium-json-map-null-key-mode to DROP [1]?

I'm also pulling in Timo who might know better.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#debezium-json-map-null-key-mode

Regards,
Roman



On Fri, Mar 12, 2021 at 2:42 PM Magri, Sebastian
 wrote:
>
> I'm trying to extract data from a Debezium CDC source, in which one of the 
> backing tables has an open schema nested JSON field like this:
>
>
> "objectives": {
> "items": [
> {
> "id": 1,
> "label": "test 1"
> "size": 1000.0
> },
> {
> "id": 2,
> "label": "test 2"
> "size": 500.0
> }
> ],
> "threshold": 10.0,
> "threshold_period": "hourly",
> "max_ms": 3.0
> }
>
>
> Any of these fields can be missing at any time, and there can also be 
> additional, different fields. It is guaranteed that a field will have the 
> same data type for all occurrences.
>
> For now, I really need to get only the `threshold` and `threshold_period` 
> fields. For which I'm using a field as the following:
>
>
> CREATE TABLE probes (
>   `objectives` ROW(`threshold` FLOAT, `threshold_period` STRING)
>   ...
> ) WITH (
>  ...
>   'format' = 'debezium-json',
>   'debezium-json.schema-include' = 'true',
>   'debezium-json.ignore-parse-errors' = 'true'
> )
>
>
> However I keep getting `NULL` values in my `objectives` column, or corrupt 
> JSON message exceptions when I disable the `ignore-parse-errors` option.
>
> Does JSON parsing need to match 100% the schema of the field or is it lenient?
>
> Is there any option or syntactic detail I'm missing?
>
> Best Regards,


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
> Want to confirm that the keys are GCed ( along with state ) once the  
> (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
 wrote:
>
> Sometimes writing it down makes you think. I now realize that this is not the 
> right approach, given that merging windows will have their own states..and 
> how the merge happens is really at the key level
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi  
> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the 
>> session lean, there is a PurgingTrigger on this aggregate that  fires on a 
>> count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  
>> (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to 
>>> combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>>  wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction>> > java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction, 
>>> > KeyedSessionWithSessionID, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor sessionId = new 
>>> > ValueStateDescriptor("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction, 
>>> > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>>> > Iterable> elements, 
>>> > Collector> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), 
>>> > uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi 
>>> >  wrote:
>>> >>
>>> >> Hello folks,
>>> >>   The suggestion is to use windowState() for a key key 
>>> >> per window state and clear the state explicitly.  Also it seems that 
>>> >> getRuntime().getState() will return a globalWindow() where state is 
>>> >> shared among windows with the same key. I desire of course to have state 
>>> >> scoped to a key per window and was wanting to use windowState().. The 
>>> >> caveat is that my window is a Session Window and when I try to use 
>>> >> clear()  I am thrown this exception  ( Session Windows are Merging 
>>> >> Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is 
>>> >> not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to 
>>> >> clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free 
>>> >> along with state per window per key and thus I  have understood  
>>> >> getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>


Re: DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Dawid Wysakowicz
Hi Alexis,

As of now there is no such feature in the DataStream API. The Batch mode
in DataStream API is a new feature and we would be interested to hear
about the use cases people want to use it for to identify potential
areas to improve. What you are suggesting generally make sense so I
think it would be nice if you could create a jira ticket for it.

Best,

Dawid

On 12/03/2021 15:37, Alexis Sarda-Espinosa wrote:
>
> Hello,
>
>  
>
> Regarding the new BATCH mode of the data stream API, I see that the
> documentation states that some operators will process all data for a
> given key before moving on to the next one. However, I don’t see how
> Flink is supposed to know whether the input will provide all data for
> a given key sequentially. In the DataSet API, an (undocumented?)
> feature is using SplitDataProperties
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html
> )
> to specify different grouping/partitioning/sorting properties, so if
> the data is pre-sorted (e.g. when reading from a database), some
> operations can be optimized. Will the DataStream API get something
> similar?
>
>  
>
> Regards,
>
> Alexis.
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Alexis Sarda-Espinosa
Hello,

Regarding the new BATCH mode of the data stream API, I see that the 
documentation states that some operators will process all data for a given key 
before moving on to the next one. However, I don't see how Flink is supposed to 
know whether the input will provide all data for a given key sequentially. In 
the DataSet API, an (undocumented?) feature is using SplitDataProperties 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html)
 to specify different grouping/partitioning/sorting properties, so if the data 
is pre-sorted (e.g. when reading from a database), some operations can be 
optimized. Will the DataStream API get something similar?

Regards,
Alexis.



Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread Chesnay Schepler
The concept of plugins does not exist in 1.8.1. As a result it should be 
sufficient for your use-case to add a dependency on flink-s3-fs-hadoop 
to your project.


On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
Let's close this issue guys please answer my questions. I am using 
Flink 1.8.1.


Thanks
Sri

On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala 
mailto:kali.tumm...@gmail.com>> wrote:


Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

Thanks
Sri

On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala
mailto:kali.tumm...@gmail.com>> wrote:

I am not getting what you both are talking about lets be clear.

Plugin ? what is it ? Is it a Jar which I have to download
from the Internet and place it in a folder ? Is this the Jar
which I have to download ? (flink-s3-fs-hadoop) ?

Will this belo solution work ?

https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being




Thanks
Sri



On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Well, you could do this before running the job:

// set the ConfigConstants.ENV_FLINK_PLUGINS_DIR
environment variable, pointing to a directory containing
the plugins

PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(new
Configuration());
Filesystem.initialize(new Configuration(), pluginManager);

On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:

Hi.

I had the same problem. Flink use a plugins to access s3.
When you run local it starts a mini cluster and the mini
cluster don’t load plugins. So it’s not possible without
modifying Flink.  In my case I wanted to investigate save
points through Flink processor API and the workaround was
to build my own version of the processor API and include
the missing part.

Med venlig hilsen / Best regards
Lasse Nedergaard



Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan
Tummala 
:


Flink,

I am able to access Kinesis from Intellij but not S3 I
have edited my stack overflow question with kinesis code
, Flink is still having issues reading S3.


https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868




Thanks
Sri

On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan
Tummala mailto:kali.tumm...@gmail.com>> wrote:

my stack overflow question.


https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868



On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan
Tummala mailto:kali.tumm...@gmail.com>> wrote:

Here is my Intellij question.


https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868



On Mon, Mar 8, 2021 at 11:22 AM sri hari kali
charan Tummala mailto:kali.tumm...@gmail.com>> wrote:


Hi Flink Experts,


I am trying to read an S3 file from my
Intellij using Flink I am.comimg across
Aws Auth error can someone help below
are all the details.

I have Aws credentials in
homefolder/.aws/credentials


My Intellij Environment Variables:-
ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1

FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config

flink-conf.yaml file content:-

fs.hdfs.hadoopconf: 
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-

Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Sush Bankapura
Hi Roman and Till,

Thank you very much for your responses. 

With regards on the workload variation across the jobs, let me put it like this
1,. We have some jobs which are CPU intensive (and only operator state being 
persisted) and there are other jobs which are not so CPU intensive, but have 
I/O operations. 
2. The traffic for each of the above jobs keep increasing over time as and when 
more data is streamed in

Our understanding is, separating the  two job types to two different clusters 
is one of the solutions- 
1. Cluster #1 should have as many slots as the number of CPU cores for the CPU 
intensive job type
2. Cluster #2 should have more number of  slots than the number of CPU cores  
for the IO intensive job types

Will study the other options proposed by you folks

Regards,
Sushruth


On 2021/03/12 12:35:13, Till Rohrmann  wrote: 
> Hi Sushruth,
> 
> if your jobs need significantly different configurations, then I would
> suggest to think about dedicated clusters per job. That way you can
> configure the cluster to work best for the respective job. Of course,
> running multiple clusters instead of a single one comes at the cost of more
> overhead which you pay for the multiple Flink processes.
> 
> If you don't want/can't use the per job clusters, then there is not much
> else you can do to control how the resources of a session cluster are
> distributed among different jobs other than what Roman has already said.
> The most effective way is to reduce the parallelism of the jobs which need
> fewer resources or splitting chains up into units which consume/require
> the same set of resources to run (CPU, memory). In the future, this problem
> will most likely be solved by FLIP-53 [1] which allows to specify resource
> requirements for operators and, thus, the slots a job needs.
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> 
> Cheers,
> Till
> 
> On Fri, Mar 12, 2021 at 12:20 PM Roman Khachatryan  wrote:
> 
> > Hi,
> >
> > Do I understand correctly that:
> > 1. The workload varies across the jobs but stays the same for the same job
> > 2. With a small number of slots per TM you are concerned about uneven
> > resource utilization when running low- and high-intensive jobs on the
> > same cluster simultaneously?
> >
> > If so, wouldn't reducing parallelism of low-intensive jobs help?
> > Other options to consider are putting subtasks of high-intensive job
> > into different slot-sharing groups; or breaking operator chains
> > explicitly [1]
> >
> > There are also a number of improvements coming in 1.13 release: [2][3][4].
> >
> > I'm pulling in Till and Robert who knows this area better.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> > [2] https://issues.apache.org/jira/browse/FLINK-21267
> > [3] https://issues.apache.org/jira/browse/FLINK-10404
> > [4] https://issues.apache.org/jira/browse/FLINK-14187
> >
> > Regards,
> > Roman
> >
> > On Fri, Mar 12, 2021 at 5:03 AM Sush Bankapura
> >  wrote:
> > >
> > > Hi,
> > >
> > > We  have multiple jobs that need to be deployed to a Flink cluster.
> > Parallelism for jobs vary and dependent on the type of work being done  and
> > so are the memory requirements. All jobs currently use the same state
> > backend.  Since the workloads handled by each job is different, the scaling
> > pattern also varies. We run all our jobs in a  single Flink cluster (7 VMs
> > with the same instance configuration)
> > >
> > >  Most of what I have read in the Flink documentation indicates any of
> > the following for setting the task slots
> > >
> > > 1. As a rule of thumb, a good default number of task slots will be the
> > number of CPU cores. With hyper-threading, each slot then takes 2 or more
> > hardware thread contexts. If you are doing any Blocking IO operations in
> > Flink job, it is suggested to have more number of slots than the core.
> > >
> > > 2. A Flink cluster needs exactly as many task slots as the highest
> > parallelism used in the job. No need to calculate how many tasks (with
> > varying parallelism) a program contains in total.
> > >
> > > I did not find documentation  for the task slot setting for the scenario
> > I have enumerated. While setting a lower value for the task slots seems to
> > work better for jobs which need to process high amounts of traffic than the
> > other jobs which process lower amounts of traffic, but this will be
> > inefficient if the slots are assigned to jobs which work on lower volumes
> > of traffic.
> > >
> > > Depending on the workload handled by each Flink job. rt seems that we
> > would need to set as many clusters.
> > >
> > > 1. Is this the only option available?
> > > 2. Are there any guidelines on deciding on the number of task slots in
> > such an environment?
> > >
> > > Thanks,
> > > Sushruth
> >
> 


[Flink SQL] Leniency of JSON parsing

2021-03-12 Thread Magri, Sebastian
I'm trying to extract data from a Debezium CDC source, in which one of the 
backing tables has an open schema nested JSON field like this:


"objectives": {
"items": [
{
"id": 1,
"label": "test 1"
"size": 1000.0
},
{
"id": 2,
"label": "test 2"
"size": 500.0
}
],
"threshold": 10.0,
"threshold_period": "hourly",
"max_ms": 3.0
}


Any of these fields can be missing at any time, and there can also be 
additional, different fields. It is guaranteed that a field will have the same 
data type for all occurrences.

For now, I really need to get only the `threshold` and `threshold_period` 
fields. For which I'm using a field as the following:


CREATE TABLE probes (
  `objectives` ROW(`threshold` FLOAT, `threshold_period` STRING)
  ...
) WITH (
 ...
  'format' = 'debezium-json',
  'debezium-json.schema-include' = 'true',
  'debezium-json.ignore-parse-errors' = 'true'
)


However I keep getting `NULL` values in my `objectives` column, or corrupt JSON 
message exceptions when I disable the `ignore-parse-errors` option.

Does JSON parsing need to match 100% the schema of the field or is it lenient?

Is there any option or syntactic detail I'm missing?

Best Regards,


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Sometimes writing it down makes you think. I now realize that this is not
the right approach, given that merging windows will have their own
states..and how the merge happens is really at the key level



On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi 
wrote:

> I intend to augment every event in a session  with a unique ID.  To keep
> the session lean, there is a PurgingTrigger on this aggregate that  fires
> on a count of 1.
>
> >> (except that the number of keys can grow).
>
> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
>
>
>
> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
> wrote:
>
>> Hi Vishal,
>>
>> There is no leak in the code you provided (except that the number of
>> keys can grow).
>> But as you figured out the state is scoped to key, not to window+key.
>>
>> Could you explain what you are trying to achieve and why do you need to
>> combine
>> sliding windows with state scoped to window+key?
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>  wrote:
>> >
>> > Essentially, Does this code leak state
>> >
>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable>
>> > extends
>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow> {
>> > private static final long serialVersionUID = 1L;
>> > private final static ValueStateDescriptor sessionId = new
>> ValueStateDescriptor("session_uid",
>> > String.class);
>> >
>> > @Override
>> > public void process(KEY key,
>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>> > Iterable> elements,
>> Collector> out)
>> > throws Exception {
>> > // I need this scoped to key/window
>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> > UUID uuid = UUID.randomUUID();
>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> > }
>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
>> uuid));
>> > }
>> > }
>> >
>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> Hello folks,
>> >>   The suggestion is to use windowState() for a key key
>> per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>
>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state
>> is not allowed when using merging windows.
>> >>
>> >>
>> >> The questions are
>> >>
>> >> * How do I have state per session window/ per key and still be able to
>> clear it ?
>> >> * Does getRuntime().getState() give me the clear() semantics for free
>> along with state per window per key and thus I  have understood
>> getRuntime().getState() wrong ?
>> >>
>> >> Regards.
>> >>
>> >>
>> >>
>>
>


Re: No saving data using rocksdb

2021-03-12 Thread Maminspapin
Hey, Roman

I use every time the same key. 
And I get the correct value in StateManager every time the processElement()
method executes.

But then I stop the job and submit it again.
And first execution processElement() get me null in state store. The key
wasn't change.

So, I'am in confuse 

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Till Rohrmann
Hi Sushruth,

if your jobs need significantly different configurations, then I would
suggest to think about dedicated clusters per job. That way you can
configure the cluster to work best for the respective job. Of course,
running multiple clusters instead of a single one comes at the cost of more
overhead which you pay for the multiple Flink processes.

If you don't want/can't use the per job clusters, then there is not much
else you can do to control how the resources of a session cluster are
distributed among different jobs other than what Roman has already said.
The most effective way is to reduce the parallelism of the jobs which need
fewer resources or splitting chains up into units which consume/require
the same set of resources to run (CPU, memory). In the future, this problem
will most likely be solved by FLIP-53 [1] which allows to specify resource
requirements for operators and, thus, the slots a job needs.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management

Cheers,
Till

On Fri, Mar 12, 2021 at 12:20 PM Roman Khachatryan  wrote:

> Hi,
>
> Do I understand correctly that:
> 1. The workload varies across the jobs but stays the same for the same job
> 2. With a small number of slots per TM you are concerned about uneven
> resource utilization when running low- and high-intensive jobs on the
> same cluster simultaneously?
>
> If so, wouldn't reducing parallelism of low-intensive jobs help?
> Other options to consider are putting subtasks of high-intensive job
> into different slot-sharing groups; or breaking operator chains
> explicitly [1]
>
> There are also a number of improvements coming in 1.13 release: [2][3][4].
>
> I'm pulling in Till and Robert who knows this area better.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
> [2] https://issues.apache.org/jira/browse/FLINK-21267
> [3] https://issues.apache.org/jira/browse/FLINK-10404
> [4] https://issues.apache.org/jira/browse/FLINK-14187
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 5:03 AM Sush Bankapura
>  wrote:
> >
> > Hi,
> >
> > We  have multiple jobs that need to be deployed to a Flink cluster.
> Parallelism for jobs vary and dependent on the type of work being done  and
> so are the memory requirements. All jobs currently use the same state
> backend.  Since the workloads handled by each job is different, the scaling
> pattern also varies. We run all our jobs in a  single Flink cluster (7 VMs
> with the same instance configuration)
> >
> >  Most of what I have read in the Flink documentation indicates any of
> the following for setting the task slots
> >
> > 1. As a rule of thumb, a good default number of task slots will be the
> number of CPU cores. With hyper-threading, each slot then takes 2 or more
> hardware thread contexts. If you are doing any Blocking IO operations in
> Flink job, it is suggested to have more number of slots than the core.
> >
> > 2. A Flink cluster needs exactly as many task slots as the highest
> parallelism used in the job. No need to calculate how many tasks (with
> varying parallelism) a program contains in total.
> >
> > I did not find documentation  for the task slot setting for the scenario
> I have enumerated. While setting a lower value for the task slots seems to
> work better for jobs which need to process high amounts of traffic than the
> other jobs which process lower amounts of traffic, but this will be
> inefficient if the slots are assigned to jobs which work on lower volumes
> of traffic.
> >
> > Depending on the workload handled by each Flink job. rt seems that we
> would need to set as many clusters.
> >
> > 1. Is this the only option available?
> > 2. Are there any guidelines on deciding on the number of task slots in
> such an environment?
> >
> > Thanks,
> > Sushruth
>


Re: Filtering lines in parquet

2021-03-12 Thread Avi Levi
Cool, thanks!

On Fri, Mar 12, 2021, 13:15 Arvid Heise  wrote:

> Hi Avi,
>
> thanks for clarifying.
>
> It seems like it's not possible to parse Parquet in Flink without knowing
> the schema. What i'd do is to parse the metadata while setting up the job
> and then pass it to the input format:
>
> ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, 
> path, fileSize);FileMetaData fileMetaData = 
> parquetMetadata.getFileMetaData();MessageType fileSchema = 
> fileMetaData.getSchema 
> ();
>
> Quite possibly that's what Spark is doing under hood. If you open a ticket
> with a feature request, we will add it in the future.
>
> On Thu, Mar 11, 2021 at 6:26 PM Avi Levi  wrote:
>
>> Hi Arvid,
>> assuming that I have A0,B0,C0 parquet files with different schema and a
>> common field *ID*, I want to write them to A1,B2,C3 files respectively.
>> My problem is that in my code I do not want to know the full schema just by
>> filtering using the ID field and writing the unfiltered lines to the
>> destination file. each source file should have a matching destination file
>> I tried to implement it using the ParquetInputFormat but I need to define
>> the schema in advance (MessageType) .
>>
>> class ParquetInput(path: Path,  messageType: MessageType) extends 
>> ParquetInputFormat[Row](path, messageType){
>>
>> I am looking for a way that my code will be agnostic to the schema and
>> will only know the "ID" field (just like in spark) e.g *val filtered =
>> rawsDF.filter(col("id") != "123")*
>>
>> Thanks
>> Avi
>>
>> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise  wrote:
>>
>>> Hi Avi,
>>>
>>> I'm not entirely sure I understand the question. Let's say you have
>>> source A, B, C all with different schema but all have an id. You could use
>>> the ParquetMapInputFormat that provides a map of the records and just use a
>>> map-lookup.
>>>
>>> However, I'm not sure how you want to write these records with different
>>> schema into the same parquet file. Maybe, you just want to extract the
>>> common fields of A, B, C? Then you can also use Table API and just declare
>>> the fields that are common.
>>>
>>> Or do you have sink A, B, C and actually 3 separate topologies?
>>>
>>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi  wrote:
>>>
 Hi all,
 I am trying to filter lines from parquet files, the problem is that
 they have different schemas, however the field that I am using to filter
 exists in all schemas.
 in spark this is quite straight forward :

 *val filtered = rawsDF.filter(col("id") != "123")*

 I tried to do it in flink by extending the ParquetInputFormat but in
 this case I need to schema (message type) and implement Convert method
 which I want to avoid since I do not want to convert the line (I want to
 write is as is to other parquet file)

 Any ideas ?

 Cheers
 Avi




Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
I intend to augment every event in a session  with a unique ID.  To keep
the session lean, there is a PurgingTrigger on this aggregate that  fires
on a count of 1.

>> (except that the number of keys can grow).

Want to confirm that the keys are GCed ( along with state ) once the
(windows close + lateness ) ?



On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  wrote:

> Hi Vishal,
>
> There is no leak in the code you provided (except that the number of
> keys can grow).
> But as you figured out the state is scoped to key, not to window+key.
>
> Could you explain what you are trying to achieve and why do you need to
> combine
> sliding windows with state scoped to window+key?
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>  wrote:
> >
> > Essentially, Does this code leak state
> >
> > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> > extends
> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> > private static final long serialVersionUID = 1L;
> > private final static ValueStateDescriptor sessionId = new
> ValueStateDescriptor("session_uid",
> > String.class);
> >
> > @Override
> > public void process(KEY key,
> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> > Iterable> elements,
> Collector> out)
> > throws Exception {
> > // I need this scoped to key/window
> > if (getRuntimeContext().getState(sessionId).value() == null) {
> > UUID uuid = UUID.randomUUID();
> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> > }
> > String uuid = getRuntimeContext().getState(sessionId).value();
> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
> uuid));
> > }
> > }
> >
> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> Hello folks,
> >>   The suggestion is to use windowState() for a key key
> per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
> >>
> >>
> >> The questions are
> >>
> >> * How do I have state per session window/ per key and still be able to
> clear it ?
> >> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>
> >> Regards.
> >>
> >>
> >>
>


Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-12 Thread Arvid Heise
Yes, please send me the full stack trace. You could also send it to me
personally if you don't want to share it on the ML.

I'm especially interested in the legacy source thread that holds the lock
0x00058e8c5070 if you only want to share an excerpt.

On Fri, Mar 12, 2021 at 2:29 AM ChangZhuo Chen (陳昌倬) 
wrote:

> On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote:
> > Hi ChangZhuo,
> >
> > Did you upgrade to Flink 1.12.2 and change the settings at the time? If
> so,
> > could you maybe reset the settings to the old values on Flink 1.12.2 and
> > check if the job still gets stuck? Especially, turning off unaligned
> > checkpoints (UC) should clarify if it's a general issue in Flink 1.12.2
> or
> > with UC.
> >
> > If it's indeed an issue with UC, then it would help to get the debug logs
> > in particular for the package
> > org.apache.flink.streaming.runtime.io.checkpointing. You could add the
> > following to your log4js.properties (set general log level to INFO).
> >
> > logger.checkpointing.name =
> org.apache.flink.streaming.runtime.io.checkpointing
> > logger.checkpointing.level = DEBUG
>
> * Thanks for this information, we are working on this one, will reply
>   when we get log.
>
> * Also, we got the stack track when checkpoint stuck, please let us know
>   if you need full trace.
>
>   * The stuck task in UI is KafkaProducer -> ProcessFunction 128
>   * The following is BLOCKED thread for Source: KafkaProducer ->
> ProcessFunction (129/140)#2
>
> "Source: KafkaProducer -> ProcessFunction (129/140)#2" #66336 prio=5
> os_prio=0 cpu=582.01ms elapsed=5079.15s tid=0x7feb32717000 nid=0x9696
> waiting for monitor entry  [0x7feb28b61000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> - waiting to lock <0x00058e8c5070> (a java.lang.Object)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)
>
> ps:
> * The original UID is redacted by their underlying type.
> * It looks like subtask id in UI is off-by-one in stacktrace.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Evenly Spreading Out Source Tasks

2021-03-12 Thread Matthias Pohl
Hi Aeden,
just to be sure: All task managers have the same hardware/memory
configuration, haven't they? I'm not 100% sure whether this affects the
slot selection in the end, but it looks like this parameter has also an
influence on the slot matching strategy preferring slots with less
utilization of resources [1].

I'm gonna add Chesnay to the thread. He might have more insights here.
@Chesnay are there any other things that might affect the slot selection
when actually trying to evenly spread out the slots?

Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141

On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson 
wrote:

> Hi Arvid,
>
>   Thanks for responding. I did check the configuration tab of the job
> manager and the setting cluster.evenly-spread-out-slots: true is
> there. However I'm still observing unevenness in the distribution of
> source tasks. Perhaps this additional information could shed light.
>
> Version: 1.12.1
> Deployment Mode: Application
> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> Flink operator https://github.com/lyft/flinkk8soperator
>
> I did place the setting under the flinkConfig section,
>
> apiVersion: flink.k8s.io/v1beta1
> 
> spec:
>   flinkConfig:
> cluster.evenly-spread-out-slots: true
> high-availability: zookeeper
> ...
> state.backend: filesystem
> ...
>   jobManagerConfig:
> envConfig:
> 
>
> Would you explain how the setting ends up evenly distributing active
> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> TM3 ... TM18 in order and starting again. In my case I have 36
> partitions and 18 nodes so after the second pass in assignment I would
> end up with 2 subtasks in the consumer group on each TM. And then
> subsequent passes result in inactive consumers.
>
>
> Thank you,
> Aeden
>
> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise  wrote:
> >
> > Hi Aeden,
> >
> > the option that you mentioned should have actually caused your desired
> behavior. Can you double-check that it's set for the job (you can look at
> the config in the Flink UI to be 100% sure).
> >
> > Another option is to simply give all task managers 2 slots. In that way,
> the scheduler can only evenly distribute.
> >
> > On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson 
> wrote:
> >>
> >> I have a cluster with 18 task managers 4 task slots each running a
> >> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> >> connector. The topic being read has 36 partitions. The problem I'm
> >> observing is that the subtasks for the sources are not evenly
> >> distributed. For example, 1 task manager will have 4 active source
> >> subtasks and other TM's none. Is there a way to force  each task
> >> manager to have 2 active source subtasks.  I tried using the setting
> >> cluster.evenly-spread-out-slots: true , but that didn't have the
> >> desired effect.
> >>
> >> --
> >> Thank you,
> >> Aeden


Re: Guidelines for setting task slots when running multiple jobs in a Flink cluster

2021-03-12 Thread Roman Khachatryan
Hi,

Do I understand correctly that:
1. The workload varies across the jobs but stays the same for the same job
2. With a small number of slots per TM you are concerned about uneven
resource utilization when running low- and high-intensive jobs on the
same cluster simultaneously?

If so, wouldn't reducing parallelism of low-intensive jobs help?
Other options to consider are putting subtasks of high-intensive job
into different slot-sharing groups; or breaking operator chains
explicitly [1]

There are also a number of improvements coming in 1.13 release: [2][3][4].

I'm pulling in Till and Robert who knows this area better.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
[2] https://issues.apache.org/jira/browse/FLINK-21267
[3] https://issues.apache.org/jira/browse/FLINK-10404
[4] https://issues.apache.org/jira/browse/FLINK-14187

Regards,
Roman

On Fri, Mar 12, 2021 at 5:03 AM Sush Bankapura
 wrote:
>
> Hi,
>
> We  have multiple jobs that need to be deployed to a Flink cluster. 
> Parallelism for jobs vary and dependent on the type of work being done  and 
> so are the memory requirements. All jobs currently use the same state 
> backend.  Since the workloads handled by each job is different, the scaling 
> pattern also varies. We run all our jobs in a  single Flink cluster (7 VMs 
> with the same instance configuration)
>
>  Most of what I have read in the Flink documentation indicates any of the 
> following for setting the task slots
>
> 1. As a rule of thumb, a good default number of task slots will be the number 
> of CPU cores. With hyper-threading, each slot then takes 2 or more hardware 
> thread contexts. If you are doing any Blocking IO operations in Flink job, it 
> is suggested to have more number of slots than the core.
>
> 2. A Flink cluster needs exactly as many task slots as the highest 
> parallelism used in the job. No need to calculate how many tasks (with 
> varying parallelism) a program contains in total.
>
> I did not find documentation  for the task slot setting for the scenario I 
> have enumerated. While setting a lower value for the task slots seems to work 
> better for jobs which need to process high amounts of traffic than the other 
> jobs which process lower amounts of traffic, but this will be inefficient if 
> the slots are assigned to jobs which work on lower volumes of traffic.
>
> Depending on the workload handled by each Flink job. rt seems that we would 
> need to set as many clusters.
>
> 1. Is this the only option available?
> 2. Are there any guidelines on deciding on the number of task slots in such 
> an environment?
>
> Thanks,
> Sushruth


Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-12 Thread Arvid Heise
Hi Lei,

yes, metaspace would run out eventually if you run too much in parallel.
All finished jobs will close the classloaders and free the metaspace memory.

For newer setups, we recommend creating an ad-hoc cluster for each Flink
application for this and several other reasons. If you are already using
K8s, I'd definitively switch to per-application clusters.

You could also have a look at the Ververica community editions for easy
managing of such applications. [1]

[1] https://www.ververica.com/getting-started

On Fri, Mar 12, 2021 at 4:15 AM Lei Wang  wrote:

> Thanks Arvid,
>
> If too many jobs run in the same task manager JVM,  will it  cause too
> much metaspace memory occupation?
>
>
> Thanks,
> Lei
>
> On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise  wrote:
>
>> Hi Lei,
>>
>> each application has its own classloader as such each static constant
>> exists multiple times (1 per job). So there should be no interference. You
>> could verify it by logging the value of the constant and see it yourself.
>>
>> Best,
>>
>> Arvid
>>
>> On Thu, Mar 11, 2021 at 7:11 AM Lei Wang  wrote:
>>
>>> Consider the following situation.
>>>
>>> Two jobs builed in the same jar, they will share some common code, for
>>> example, some static constants variables.
>>> Currently they are running on the same task manager process.
>>>
>>> I  killed job1, changed the static variable to another and then submit
>>> it again.
>>> Does another job will get the new value of the static variable or still
>>> use the old value?
>>>
>>> Thanks,
>>> Lei
>>>
>>>
>>>


Re: Filtering lines in parquet

2021-03-12 Thread Arvid Heise
Hi Avi,

thanks for clarifying.

It seems like it's not possible to parse Parquet in Flink without knowing
the schema. What i'd do is to parse the metadata while setting up the job
and then pass it to the input format:

ParquetMetadata parquetMetadata =
MetadataReader.readFooter(inputStream, path, fileSize);FileMetaData
fileMetaData = parquetMetadata.getFileMetaData();MessageType
fileSchema = fileMetaData.getSchema
();

Quite possibly that's what Spark is doing under hood. If you open a ticket
with a feature request, we will add it in the future.

On Thu, Mar 11, 2021 at 6:26 PM Avi Levi  wrote:

> Hi Arvid,
> assuming that I have A0,B0,C0 parquet files with different schema and a
> common field *ID*, I want to write them to A1,B2,C3 files respectively.
> My problem is that in my code I do not want to know the full schema just by
> filtering using the ID field and writing the unfiltered lines to the
> destination file. each source file should have a matching destination file
> I tried to implement it using the ParquetInputFormat but I need to define
> the schema in advance (MessageType) .
>
> class ParquetInput(path: Path,  messageType: MessageType) extends 
> ParquetInputFormat[Row](path, messageType){
>
> I am looking for a way that my code will be agnostic to the schema and
> will only know the "ID" field (just like in spark) e.g *val filtered =
> rawsDF.filter(col("id") != "123")*
>
> Thanks
> Avi
>
> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise  wrote:
>
>> Hi Avi,
>>
>> I'm not entirely sure I understand the question. Let's say you have
>> source A, B, C all with different schema but all have an id. You could use
>> the ParquetMapInputFormat that provides a map of the records and just use a
>> map-lookup.
>>
>> However, I'm not sure how you want to write these records with different
>> schema into the same parquet file. Maybe, you just want to extract the
>> common fields of A, B, C? Then you can also use Table API and just declare
>> the fields that are common.
>>
>> Or do you have sink A, B, C and actually 3 separate topologies?
>>
>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi  wrote:
>>
>>> Hi all,
>>> I am trying to filter lines from parquet files, the problem is that they
>>> have different schemas, however the field that I am using to filter
>>> exists in all schemas.
>>> in spark this is quite straight forward :
>>>
>>> *val filtered = rawsDF.filter(col("id") != "123")*
>>>
>>> I tried to do it in flink by extending the ParquetInputFormat but in
>>> this case I need to schema (message type) and implement Convert method
>>> which I want to avoid since I do not want to convert the line (I want to
>>> write is as is to other parquet file)
>>>
>>> Any ideas ?
>>>
>>> Cheers
>>> Avi
>>>
>>>


Re: User metrics outside tasks

2021-03-12 Thread Arvid Heise
Hi Bob and Alexey,

I double-checked and there is currently no way to achieve what you want.

The good news is that the OOM part should be addressed through FLINK-20833
[1], maybe it's even suitable for other issues.

A "workaround" (I don't think it's a workaround) for your issues would
actually be to check the logs and automatically parse certain patterns.
That should be a common setup for all big data applications. Here you are
free to emit whatever you want. The parsed entries should go into your
metric systems if applicable.

[1] https://issues.apache.org/jira/browse/FLINK-20833

On Thu, Mar 11, 2021 at 6:05 PM Bob Tiernay  wrote:

> I too think this would be a useful capability for the job manager to be
> able
> to send metrics easily. Sometimes additional compute responsibilities are
> placed in the job manager and having a convenient way to add telemetry data
> into a metrics stream would be very useful.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
 wrote:
>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction, 
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor sessionId = new 
> ValueStateDescriptor("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction, 
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> Iterable> elements, 
> Collector> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), 
> uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi  
> wrote:
>>
>> Hello folks,
>>   The suggestion is to use windowState() for a key key per 
>> window state and clear the state explicitly.  Also it seems that 
>> getRuntime().getState() will return a globalWindow() where state is shared 
>> among windows with the same key. I desire of course to have state scoped to 
>> a key per window and was wanting to use windowState().. The caveat is that 
>> my window is a Session Window and when I try to use clear()  I am thrown 
>> this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not 
>> allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear 
>> it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along 
>> with state per window per key and thus I  have understood  
>> getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>


Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Are you starting the job from savepoint [1] when submitting it again?
If not, it is considered as a new job and will not pick up the old state.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint

Regards,
Roman


On Fri, Mar 12, 2021 at 10:08 AM Maminspapin  wrote:
>
> Hey, Roman
>
> I use every time the same key.
> And I get the correct value in StateManager every time the processElement()
> method executes.
>
> But then I stop the job and submit it again.
> And first execution processElement() get me null in state store. The key
> wasn't change.
>
> So, I'am in confuse
>
> Thanks,
> Yuri L.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


No saving data using rocksdb

2021-03-12 Thread Maminspapin
I have following piece of configuration in flink.yaml:

Key Value
high-availability   zookeeper
high-availability.storageDir
file:///home/flink/flink-ha-data
high-availability.zookeeper.quorum  localhost:2181
state.backend   rocksdb
state.backend.incremental   true
state.checkpoints.dir   
file:///home/flink/checkpoints

And in my code (Main.class):

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new
RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
env.enableCheckpointing(Duration.ofMinutes(5).toMillis());

Also the next class should to save data in store, when event is received:

public class StateManager extends KeyedProcessFunction {

private ValueState events;


@Override
public void processElement(String s, Context context, Collector
collector) throws Exception {

System.out.println("events: " + events.value()); // Check last value
for this key

Model model = new Gson().fromJson(s, Model.class);
events.update(model.toString());
}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor stateDescriptor = new
ValueStateDescriptor<>("state", Types.STRING);
events = getRuntimeContext().getState(stateDescriptor);
System.out.println("In open");
}
}


But when I stop a job and start it again no saving data I see. I check it
with printing data to sysout. There is null value after restarting job.

But why do I get this behavior? Maybe my settings is not proper?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson  wrote:

> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
> that downstream operators will ignore those streams and allow the
> watermarks to progress based only on the advancement of the watermarks of
> the still active streams. As you suspected, this mechanism does not provide
> for the watermark to be advanced in situations where all of the streams are
> idle.
>
> If your goal is ensure that all of the events are processed and all
> event-time timers are fired (and all event-time windows are closed) before
> a job ends, Flink already includes a mechanism for this purpose. If you are
> using a bounded source, then when that source reaches the end of its input,
> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
> emitted. The --drain option, as in
>
> ./bin/flink stop --drain 
>
> also has this effect [1].
>
> With a Kafka source, you can arrange for this to happen by having your
> kafka deserializer return true from its isEndOfStream() method. Or you
> could use the new KafkaSource connector included in Flink 1.12 with
> its setBounded option.
>
> On the other hand, if you really did need to advance the watermark despite
> a (possibly temporary) total lack of events, you could implement a
> watermark strategy that artificially advances the watermark based on the
> passage of processing time. You'll find an example in [2], though it hasn't
> been updated to use the new watermark strategy interface.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
> [2]
> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>
> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill  wrote:
>
>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
>> broken?  None of my timers trigger when I'd expect idleness to take over.
>>
>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:
>>
>>> Hi.
>>>
>>> For local and tests development, I want to flush the events in my system
>>> to make sure I'm processing everything.  My watermark does not progress to
>>> finish all of the data.
>>>
>>> What's the best practice for local development or tests?
>>>
>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>> guessing there is logic to prevent removing an idle partition if it's the
>>> only partition.  Is there a version of this I can enable for local
>>> development that supports 1 partition?
>>>
>>> I see this tech talk.  Are there other talks to watch?
>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>
>>> Do I need to write my own watermark generator?  Or change my test data
>>> to have a way of generating watermarks?
>>>
>>> I've tried a few variants of the following source code.  The watermark
>>> doesn't progress in the operator right after creating the source.
>>>
>>> SingleOutputStreamOperator viewInput = env.addSource(...)
>>> .uid("source-view")
>>> .assignTimestampsAndWatermarks(
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>


Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread David Anderson
WatermarkStrategy.withIdleness works by marking idle streams as idle, so
that downstream operators will ignore those streams and allow the
watermarks to progress based only on the advancement of the watermarks of
the still active streams. As you suspected, this mechanism does not provide
for the watermark to be advanced in situations where all of the streams are
idle.

If your goal is ensure that all of the events are processed and all
event-time timers are fired (and all event-time windows are closed) before
a job ends, Flink already includes a mechanism for this purpose. If you are
using a bounded source, then when that source reaches the end of its input,
a final Watermark of value Watermark.MAX_WATERMARK will be automatically
emitted. The --drain option, as in

./bin/flink stop --drain 

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your
kafka deserializer return true from its isEndOfStream() method. Or you
could use the new KafkaSource connector included in Flink 1.12 with
its setBounded option.

On the other hand, if you really did need to advance the watermark despite
a (possibly temporary) total lack of events, you could implement a
watermark strategy that artificially advances the watermark based on the
passage of processing time. You'll find an example in [2], though it hasn't
been updated to use the new watermark strategy interface.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
[2]
https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java

On Fri, Mar 12, 2021 at 9:47 AM Dan Hill  wrote:

> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
> broken?  None of my timers trigger when I'd expect idleness to take over.
>
> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:
>
>> Hi.
>>
>> For local and tests development, I want to flush the events in my system
>> to make sure I'm processing everything.  My watermark does not progress to
>> finish all of the data.
>>
>> What's the best practice for local development or tests?
>>
>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>> guessing there is logic to prevent removing an idle partition if it's the
>> only partition.  Is there a version of this I can enable for local
>> development that supports 1 partition?
>>
>> I see this tech talk.  Are there other talks to watch?
>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>
>> Do I need to write my own watermark generator?  Or change my test data to
>> have a way of generating watermarks?
>>
>> I've tried a few variants of the following source code.  The watermark
>> doesn't progress in the operator right after creating the source.
>>
>> SingleOutputStreamOperator viewInput = env.addSource(...)
>> .uid("source-view")
>> .assignTimestampsAndWatermarks(
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>


Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:

> Hi.
>
> For local and tests development, I want to flush the events in my system
> to make sure I'm processing everything.  My watermark does not progress to
> finish all of the data.
>
> What's the best practice for local development or tests?
>
> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
> guessing there is logic to prevent removing an idle partition if it's the
> only partition.  Is there a version of this I can enable for local
> development that supports 1 partition?
>
> I see this tech talk.  Are there other talks to watch?
> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>
> Do I need to write my own watermark generator?  Or change my test data to
> have a way of generating watermarks?
>
> I've tried a few variants of the following source code.  The watermark
> doesn't progress in the operator right after creating the source.
>
> SingleOutputStreamOperator viewInput = env.addSource(...)
> .uid("source-view")
> .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>


Re: Gradually increasing checkpoint size

2021-03-12 Thread Dan Hill
I figured it out.  I have some records with the same key and I was doing an
IntervalJoin.  One of the IntervalJoin implementations that I found looks
like it the runtime increases exponentially when there are duplicate keys.
I introduced a de-duping step and it works a lot faster.

On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz 
wrote:

> Hey Dan,
>
> I think the logic should be correct. Mind that in the processElement we
> are using *relative*Upper/LowerBound, which are inverted global bound:
>
> relativeUpperBound = upperBound for left and -lowerBound for right
>
> relativeLowerBound = lowerBound for left and -upperBound for right
>
> Therefore the cleaning logic in onTimer effectively uses the same logic.
> If I understand it correctly, this trick was introduced to deduplicate the
> method.
>
> There might be a bug somewhere, but I don't think it's where you pointed.
> I'd suggest to first investigate the progress of watermarks.
>
> Best,
>
> Dawid
> On 09/03/2021 08:36, Dan Hill wrote:
>
> Hi Yun!
>
> That advice was useful.  The state for that operator is very small
> (31kb).  Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators.  The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration changes.
> One thing I did notice is that I set a positive value for the
> relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
> .
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
> clean up.  It has some logic around cleaning up the right side that uses 
> timerTimestamp
> + lowerBound
> .
> However, processElement doesn’t use the same logic when creating a timer (I
> only see + lowerBound
> ).
> Maybe I'm misreading the code.  It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao  wrote:
>
>> Hi Dan,
>>
>> Regarding the original checkpoint size problem, could you also have a
>> check
>> which tasks' state are increasing from the checkpoint UI ? For example,
>> the
>> attached operator has a `alreadyOutputed` value state, which seems to keep
>> increasing if there are always new keys ?
>>
>> Best,
>> Yun
>>
>>
>> --Original Mail --
>> *Sender:*Dan Hill 
>> *Send Date:*Tue Mar 9 00:59:24 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Gradually increasing checkpoint size
>>
>>> Hi Yun!
>>>
>>> Thanks for the quick reply.
>>>
>>> One of the lowerBounds is large but the table being joined with is ~500
>>> rows.  I also have my own operator that only outputs the first value.
>>>
>>> public class OnlyFirstUser extends
>>> RichFlatMapFunction {
>>>
>>>
>>> private transient ValueState alreadyOutputted;
>>>
>>>
>>> @Override
>>>
>>> public void flatMap(T value, Collector out) throws Exception {
>>>
>>> if (!alreadyOutputted.value()) {
>>>
>>> alreadyOutputted.update(true);
>>>
>>> out.collect(value);
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> @Override
>>>
>>> public void open(Configuration config) {
>>>
>>> ValueStateDescriptor descriptor =
>>>
>>> new ValueStateDescriptor<>(
>>>
>>> "alreadyOutputted", // the state name
>>>
>>> TypeInformation.of(new TypeHint() {}),
>>> // type information
>>>
>>> false); // default value of the state, if
>>> nothing was set
>>>
>>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>>
>>> }
>>>
>>> }
>>>
>>> All of my inputs have this watermark strategy.  In the Flink UI, early
>>> in the job run, I see "Low Watermarks" on each node and they increase.
>>> After some checkpoint failures, low watermarks stop appearing in the UI
>>> 
>>> .
>>>
>>>
>>> .assignTimestampsAndWatermarks(
>>>
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>>
>>>
>>> Thanks Yun!
>>>
>>>
>>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:
>>>
 Hi Dan,

 Have you use a too large upperBound or lowerBound?

 If not, could you also check the watermark strategy ?
 The interval join operator depends on the event-time
 timer for cleanup, and the event-time timer would be
 triggered via watermark.

 Best,
 Yun


 --Original Mail --
 *Sender:*Dan Hill 
 *Send Date:*Mon Mar

Re: No saving data using rocksdb

2021-03-12 Thread Roman Khachatryan
Hi Yuri,

The state that you access with getRuntimeContext().getState(...) is
scoped to the key (so for every new key this state will be null).
What key do you use?

Regards,
Roman

On Fri, Mar 12, 2021 at 7:22 AM Maminspapin  wrote:
>
> I have following piece of configuration in flink.yaml:
>
> Key Value
> high-availability   zookeeper
> high-availability.storageDir
> file:///home/flink/flink-ha-data
> high-availability.zookeeper.quorum  localhost:2181
> state.backend   rocksdb
> state.backend.incremental   true
> state.checkpoints.dir   
> file:///home/flink/checkpoints
>
> And in my code (Main.class):
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStateBackend(new
> RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
> env.enableCheckpointing(Duration.ofMinutes(5).toMillis());
>
> Also the next class should to save data in store, when event is received:
>
> public class StateManager extends KeyedProcessFunction String> {
>
> private ValueState events;
>
>
> @Override
> public void processElement(String s, Context context, Collector
> collector) throws Exception {
>
> System.out.println("events: " + events.value()); // Check last value
> for this key
>
> Model model = new Gson().fromJson(s, Model.class);
> events.update(model.toString());
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> ValueStateDescriptor stateDescriptor = new
> ValueStateDescriptor<>("state", Types.STRING);
> events = getRuntimeContext().getState(stateDescriptor);
> System.out.println("In open");
> }
> }
>
>
> But when I stop a job and start it again no saving data I see. I check it
> with printing data to sysout. There is null value after restarting job.
>
> But why do I get this behavior? Maybe my settings is not proper?
>
> Thanks,
> Yuri L.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/