Flink SQL 1.12 How to implement query Hbase table on secondary index

2022-03-16 Thread WuKong
Hi, 
now my data store hbase and I want use flink to implement kafka table 
temproal join hbase table , but condtion is not rowkey ,  I realize hbase 
secondary index, how can I implement this function what can use flink sql first 
query secondary index(such as es) and then use rowkey query hbase table (must 
implement own hbase secondary connectors ?) 



---
Best,
WuKong


Potential Bug with Date Serialization for Table Stream

2022-03-16 Thread Tom Thornton
Per the docs , I'm
hoping to confirm whether or not an error we are seeing is a bug with
Flink. We have a job that uses a Kafka source to read Avro records. The
kafka source is converted into a StreamTableSource. We are using the new
Blink table planner to execute SQL on the table stream. The output is then
put in a sink back to kafka as Avro records. Whenever a query selects a
column that has an avro logicalType of date, we get this error (link to full
stack trace ).

Caused by: java.lang.ClassCastException: class java.sql.Date cannot be
cast to class java.time.LocalDate (java.sql.Date is in module java.sql
of loader 'platform'; java.time.LocalDate is in module java.base of
loader 'bootstrap')
at 
org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
at java.base/java.lang.Thread.run(Thread.java:829)


The avro schema definition for a date field is as follows:

{
"name": "date",
"type": {
"type": "int",
"logicalType": "date"
},
"doc": "date"
},

Any query that selects a date column would produce the error (and any
query without a column with type date will work). Example of a query
that causes the error:

select `date` from table1

As suggested in the docs, I also tried this with parent-first loading
and got the same error. When we run the same job without the Blink
table planner, i.e., useOldPlanner(), we do not get this error. Is
this a bug with Flink? Or is there something we can change in the
application code to prevent this error? Any help/suggestions would be
appreciated.


Stateful function with GCP Pub/Sub ingress/egress

2022-03-16 Thread David Dixon
The statefun docs have some nice examples of how to use Kafka and Kinesis
for ingress/egress in conjunction with a function. Is there some
documentation or example code I could reference to do the same with a GCP
Pub/Sub topic? Thanks.

Dave


RE: Watermarks event time vs processing time

2022-03-16 Thread Schwalbe Matthias
Hi Hanspeter,

Event time mode should work just the same … for your example below you your 
need only one single arbitrary event per kafka partition that has a timestamp > 
1646992800560 + sessionWindowGap + outOfOrderness in order for the session 
window to be triggered.

I’m not sure why processing time window does not work without watermarking 
configured (I never use processing time mode).
You need to consider what consistency guaranties you need in processing time 
mode: in case the job fails and is restarted (or if network i/o exhibits short 
hickups beyond your session gap), then you might get results that split a 
single transaction_id into multiple session windows …
The choice is yours 😊

As to the aggregation method: current event time – last event time … not 
min/max … otherwise not different 😊

If you want to find out why event time mode blocks you might find monitoring of 
the watermarks on single operators / per subtask useful:
Look for subtasks that don’t have watermarks, or too low watermarks for a 
specific session window to trigger.


Thias


From: HG 
Sent: Mittwoch, 16. März 2022 16:41
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Matthias and others

Thanks for the answer.
I will remove the Idleness.
However I am not doing max/min etc. Unfortunately most examples are about 
aggregations.

The inputs are like this
{"handling_time":1646992800260,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800290,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800360,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800560,"transaction_id":"017f6af1548e-119dfb",}
The output like this
{"handling_time":1646992800260,"transaction_id":"017f6af1548e-119dfb","elapse":0,}
{"handling_time":1646992800290,"transaction_id":"017f6af1548e-119dfb","elapse":30,}
{"handling_time":1646992800360,"transaction_id":"017f6af1548e-119dfb","elapse":70,}
{"handling_time":1646992800560,"transaction_id":"017f6af1548e-119dfb",,"elapse":200}

I started with handling_time as timestamp. But that did not workout well. I 
don't know why.
Then I switched to session processing time. Which is also OK because the 
outcomes of the elapsed time does not rely on the event time.

Then I thought 'let me remove the kafka watermark assigner.
But as soon as I did that no events would appear at the sink.
So I left both watermark timestamp assigners in place.
They do no harm it seems and leaving them out appears to do. It is not ideal 
but it works..
But I'd rather know the correct way how to set it up.

Regards Hans-Peter








Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>:
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific 
enough (please confirm or correct in your answer):

a.   You store incoming events in state per transaction_id to be 
sorted/aggregated(min/max time) by event time later on

b.   So far you used a session window to determine the point in time when 
to emit the stored/enriched/sorted events

c.Watermarks are generated with bounded out of orderness

d.   You use session windows with a specific gap

e.   In your experiment you ever only send 1000 events and then stop 
producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, 
you simply assume that you’ve seen all events belonging so a single transaction 
id if the last such event for a specific transaction id was processed 
sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest 
sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is 
exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time 
ever follows the watermark (which itself is a meta-event that flows with the 
proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the 
window operator needs to receive a watermark that is sessionWindowGap 
milliseconds beyond the latest incoming event (in terms of the respective event 
time)
  *   The watermark generator in order to generate a new watermark that 
triggers this last session window above needs to encounter an (any) event that 
has a timestamp of ( + outOfOrderness + 
sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on 
processing time, but only based on the timestamps it has seen in events 
actually encountered
  *   Coming back to your idleness configuration: it only means

Re: Watermarks event time vs processing time

2022-03-16 Thread HG
Hi Matthias and others

Thanks for the answer.
I will remove the Idleness.
However I am not doing max/min etc. Unfortunately most examples are about
aggregations.

The inputs are like this
{"handling_time":1646992800260,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800290,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800360,"transaction_id":"017f6af1548e-119dfb",}
{"handling_time":1646992800560,"transaction_id":"017f6af1548e-119dfb",}
The output like this
{"handling_time":1646992800260,"transaction_id":"017f6af1548e-119dfb","elapse":0,}
{"handling_time":1646992800290,"transaction_id":"017f6af1548e-119dfb","elapse":30,}
{"handling_time":1646992800360,"transaction_id":"017f6af1548e-119dfb","elapse":70,}
{"handling_time":1646992800560,"transaction_id":"017f6af1548e-119dfb",,"elapse":200}

I started with handling_time as timestamp. But that did not workout well. I
don't know why.
Then I switched to session processing time. Which is also OK because the
outcomes of the elapsed time does not rely on the event time.

Then I thought 'let me remove the kafka watermark assigner.
But as soon as I did that no events would appear at the sink.
So I left both watermark timestamp assigners in place.
They do no harm it seems and leaving them out appears to do. It is not
ideal but it works..
But I'd rather know the correct way how to set it up.

Regards Hans-Peter








Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>1. You store incoming events in state per transaction_id to be
>sorted/aggregated(min/max time) by event time later on
>2. So far you used a session window to determine the point in time
>when to emit the stored/enriched/sorted events
>3. Watermarks are generated with bounded out of orderness
>4. You use session windows with a specific gap
>5. In your experiment you ever only send 1000 events and then stop
>producing incoming events
>
>
>
> Now to your questions:
>
>- For processing time session windows, watermarks play no role
>whatsoever, you simply assume that you’ve seen all events belonging so a
>single transaction id if the last such event for a specific transaction id
>was processed sessionWindowGap milliseconds ago
>- Therefore you see all enriched incoming events the latest
>sessionWindowGap ms after the last incoming event (+ some latency)
>- In event time mode and resp event time session windows the situation
>is exactly the same, only that processing time play no role
>- A watermark means (ideally) that no event older than the watermark
>time ever follows the watermark (which itself is a meta-event that flows
>with the proper events on the same channels)
>- In order for a session gap window to forward the enriched events the
>window operator needs to receive a watermark that is sessionWindowGap
>milliseconds beyond the latest incoming event (in terms of the respective
>event time)
>- The watermark generator in order to generate a new watermark that
>triggers this last session window above needs to encounter an (any) event
>that has a timestamp of ( + outOfOrderness
>+ sessionWindowGap + 1ms)
>- Remember, the watermark generator never generated watermarks based
>on processing time, but only based on the timestamps it has seen in events
>actually encountered
>- Coming back to your idleness configuration: it only means that the
>incoming stream becomes idle == timeless after a while … i.e. watermarks
>won’t make progress from this steam, and it tells all downstream operators
>- Idleness specification is only useful if a respective operator has
>another source of valid watermarks (i.e. after a union of two streams, one
>active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG 
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user 
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that 

RE: Watermarks event time vs processing time

2022-03-16 Thread Schwalbe Matthias
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific 
enough (please confirm or correct in your answer):

  1.  You store incoming events in state per transaction_id to be 
sorted/aggregated(min/max time) by event time later on
  2.  So far you used a session window to determine the point in time when to 
emit the stored/enriched/sorted events
  3.  Watermarks are generated with bounded out of orderness
  4.  You use session windows with a specific gap
  5.  In your experiment you ever only send 1000 events and then stop producing 
incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, 
you simply assume that you’ve seen all events belonging so a single transaction 
id if the last such event for a specific transaction id was processed 
sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest 
sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is 
exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time 
ever follows the watermark (which itself is a meta-event that flows with the 
proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the 
window operator needs to receive a watermark that is sessionWindowGap 
milliseconds beyond the latest incoming event (in terms of the respective event 
time)
  *   The watermark generator in order to generate a new watermark that 
triggers this last session window above needs to encounter an (any) event that 
has a timestamp of ( + outOfOrderness + 
sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on 
processing time, but only based on the timestamps it has seen in events 
actually encountered
  *   Coming back to your idleness configuration: it only means that the 
incoming stream becomes idle == timeless after a while … i.e. watermarks won’t 
make progress from this steam, and it tells all downstream operators
  *   Idleness specification is only useful if a respective operator has 
another source of valid watermarks (i.e. after a union of two streams, one 
active/one idle ….). this is not your case

I hope this clarifies your situation.

Cheers


Thias


From: HG 
Sent: Mittwoch, 16. März 2022 10:06
To: user 
Subject: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds, a 
transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time and 
calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the wall 
clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not run 
as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with watermarks 
with timestamp assigners (handling time) for the Kafka input stream and the 
data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)

.setProperty("partition.discovery.interval.ms",
 partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", 
commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new 
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();

/* A watermark is needed to prevent duplicates! */
WatermarkStrategy kafkaWmstrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdl

Adding a custom Kafka deserializer to Statefun Job

2022-03-16 Thread Christopher Gustafson
Hi,


I am writing a StateFun application using remote functions, where I want to 
include a custom Kafka deserializer that adds the timestamp of the Kafka 
ingress messages to the messages sent between my remote functions. I can't seem 
to find a solution to this using remote functions, as the current option using 
Custom Types only has examples that look at the value of the incoming Kafka 
messages and not the other metadata.


I have been looking at the following example:

 public static final Type TYPE = SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameFromString("com.example/User"),
mapper:writeValueAsBytes,
bytes ->mapper.readValue(bytes,User .class));


But instead of just deserializing by reading the value, I want to use my own 
KafkaIngressDeserializer that also adds the timestamp to the Object being 
returned. Is there a way of doing so?


Best Regards,

Christopher Gustafson


Re: Rescaling REST API not working

2022-03-16 Thread 胡伟华
HI, Aarsh Shah

By consulting the information, I found that this API has been temporarily 
deprecated, and there is no API to deal with rescaling.
Maybe you can try to use the reactive scheduler.

> 2022年3月16日 下午2:15,Aarsh Shah  写道:
> 
> Hello, 
> I tried to call the rescaling api with patch to automatically rescale, but it 
> shows 503, is it deprecated? because it is present in the docs too, and if it 
> is deprecated, is there any API through which I can rescale directly? Because 
> the mode which we are using is not the reactive mode, we need to either come 
> up with a script kind of thing or if there is an API, our time would be 
> saved. 
> Please help.
> Thank you.



Re: RocksDB metrics for effective memory consumption

2022-03-16 Thread Yun Tang
Hi Donatien,

The managed memory in Flink actually locates off-heap and stays as native 
memory, e.g memory consumed by RocksDB, python. In other words, JVM cannot know 
how much the memory that third-party software used.
Thus, Flink will just set the upper limit for the managed memory and let 
third-party software to consume. That's why you can see the managed memory is 
always allocated full at the beginning.

And if you want to know the memory used by RocksDB, you should use jeprof + 
jemalloc to detect the malloced memory from RocksDB side, or refer to RocksDB 
reported block cache usage [1]. Please note that all RocksDB instances within 
same slot would share the same block cache, they will report same usage.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-metrics-block-cache-usage

Best
Yun Tang

From: Donatien Schmitz 
Sent: Tuesday, March 15, 2022 19:45
To: user@flink.apache.org 
Subject: RocksDB metrics for effective memory consumption

Hi,

I am working on the analysis of the memory consumption of RocksDB state backend 
for simple DAGs. I would like to check fine-grained memory utilization of 
RocksDB with the native metrics (reported on Prometheus+Grafana). RocksDB uses 
Managed memory allocated to each TaskManager but this value peaks at the 
beginning of the job. Is the managed memory always allocated at full even if it 
would not be necessary?

For my experiments I am using a simple DAG consisting of Source (FS) -> Map -> 
DiscardSink. The Map does not process anything but stores the latest value of 
the KeyedStream keys (with predicted amount of keys in the dataset and constant 
value size (1024 bytes)).

I anyone has some more insights on the memory utilization of RocksDB at Flink's 
level, I would appreciate.

Best,

Donatien Schmitz
PhD Student


Watermarks event time vs processing time

2022-03-16 Thread HG
Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds,
a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time
and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed
further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the
wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not
run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with
watermarks with timestamp assigners (handling time) for the Kafka input
stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();

/* A watermark is needed to prevent duplicates! */
WatermarkStrategy kafkaWmstrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(ObjectNode element, long
eventTime) {
return
element.get("value").get("handling_time").asLong();
}
});

/* Use the watermark stragegy to create a datastream */
DataStream ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");

/* Split the ObjectNode into a Tuple4 */
DataStream> tuple4ds =
ds.flatMap(new Splitter());

WatermarkStrategy> wmStrategy =
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner>() {
@Override
public long extractTimestamp(Tuple4 element, long eventTime) {
return element.f0;
}
});

DataStream> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

DataStream  tuple4DsWmKeyedbytr =  tuple4dswm
.keyBy(new KeySelector,
String>() {
@Override
public String getKey(Tuple4
value) throws Exception {
return value.f2;
}
})

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());


KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new
SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);
KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms", 
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", 
commitOffsetsO

Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-16 Thread Xingbo Huang
Hi Konstantin,

I have installed the wheel packages of 1.13.6 and 1.14.4 respectively, and
tested them through some examples. Thanks a lot for your work.

Best,
Xingbo

Konstantin Knauf  于2022年3月16日周三 15:29写道:

> Hi Xingbo,
>
> you are totally right. Thank you for noticing. This also affected Flink
> 1.13.6, the other release I was recently managing. I simply skipped a step
> in the release guide.
>
> It should be fixed now. Could you double-check?
>
> Cheers,
>
> Konstantin
>
> On Wed, Mar 16, 2022 at 4:07 AM Xingbo Huang  wrote:
>
> > Thanks a lot for being our release manager Konstantin and everyone who
> > contributed. I have a question about pyflink. I see that there are no
> > corresponding wheel packages uploaded on pypi, only the source package is
> > uploaded. Is there something wrong with building the wheel packages?
> >
> > Best,
> > Xingbo
> >
> > Leonard Xu  于2022年3月16日周三 01:02写道:
> >
> >> Thanks a lot for being our release manager Konstantin and everyone who
> >> involved!
> >>
> >> Best,
> >> Leonard
> >>
> >> 2022年3月15日 下午9:34,Martijn Visser  写道:
> >>
> >> Thank you Konstantin and everyone who contributed!
> >>
> >>
> >>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: Setting S3 as State Backend in SQL Client

2022-03-16 Thread Martijn Visser
Hi dz902,

I actually can't find that sentence on the website you've linked to. It
does state "The following sections list all available options that can be
used to adjust Flink Table & SQL API programs.". So that list are the
available options that you can use. The options that you're trying are not
included in the list, you can't set those options from the SQL Client.

Options that you set in flink-conf.yaml are applicable to the jobs on the
cluster, so also the ones created via the SQL Client.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Wed, 16 Mar 2022 at 08:43, dz902  wrote:

> Hi,
>
> Per SQL Lite doc (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/)
> I see this:
>
> > SQL Client Configuration
> > You can configure the SQL client by setting the options below, or any
> valid Flink configuration entry:
>
> So any valid Flink configuration should work? For example,
> execution.checkpointing.interval='10s' works despite not being listed there.
>
>
> On Wed, Mar 16, 2022 at 3:07 PM Paul Lam  wrote:
>
>> Hi,
>>
>> If I remember correctly, set operations supports only a limited set of
>> configurations.
>>
>> Most of them are table options that are listed on table configuration [1]
>> plus some pipeline options.
>>
>> State backend options are not likely one of them.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/
>>
>> Best,
>> Paul Lam
>>
>> 2022年3月15日 19:21,dz902  写道:
>>
>> Just tried editing flink-conf.yaml and it seems SQL Client does not
>> respect that also. Is this an intended behavior?
>>
>> On Tue, Mar 15, 2022 at 7:14 PM dz902  wrote:
>>
>>> Hi,
>>>
>>> I'm using Flink 1.14 and was unable to set S3 as state backend. I tried
>>> combination of:
>>>
>>> SET state.backend='filesystem';
>>> SET state.checkpoints.dir='s3://xxx/checkpoints/';
>>> SET state.backend.fs.checkpointdir='s3://xxx/checkpoints/';
>>> SET state.checkpoint-storage='filesystem'
>>>
>>> As well as:
>>>
>>> SET state.backend='hashmap';
>>>
>>> Which covered both legacy 1.13 way to do it and 1.14 new way to do it.
>>>
>>> None worked. In the Web UI I see checkpoints being made to the Job
>>> Manager continuously. Configuration reads:
>>>
>>> - Checkpoint Storage: JobManagerCheckpointStorage
>>> - State Backend: HashMapStateBackend
>>>
>>> Is this a bug? Is there a way to set state backend to S3 using SQL
>>> Client?
>>>
>>> Thanks,
>>> Dai
>>>
>>>
>>>
>>


Re: Setting S3 as State Backend in SQL Client

2022-03-16 Thread dz902
Hi,

Per SQL Lite doc (
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/)
I see this:

> SQL Client Configuration
> You can configure the SQL client by setting the options below, or any
valid Flink configuration entry:

So any valid Flink configuration should work? For example,
execution.checkpointing.interval='10s' works despite not being listed there.


On Wed, Mar 16, 2022 at 3:07 PM Paul Lam  wrote:

> Hi,
>
> If I remember correctly, set operations supports only a limited set of
> configurations.
>
> Most of them are table options that are listed on table configuration [1]
> plus some pipeline options.
>
> State backend options are not likely one of them.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/
>
> Best,
> Paul Lam
>
> 2022年3月15日 19:21,dz902  写道:
>
> Just tried editing flink-conf.yaml and it seems SQL Client does not
> respect that also. Is this an intended behavior?
>
> On Tue, Mar 15, 2022 at 7:14 PM dz902  wrote:
>
>> Hi,
>>
>> I'm using Flink 1.14 and was unable to set S3 as state backend. I tried
>> combination of:
>>
>> SET state.backend='filesystem';
>> SET state.checkpoints.dir='s3://xxx/checkpoints/';
>> SET state.backend.fs.checkpointdir='s3://xxx/checkpoints/';
>> SET state.checkpoint-storage='filesystem'
>>
>> As well as:
>>
>> SET state.backend='hashmap';
>>
>> Which covered both legacy 1.13 way to do it and 1.14 new way to do it.
>>
>> None worked. In the Web UI I see checkpoints being made to the Job
>> Manager continuously. Configuration reads:
>>
>> - Checkpoint Storage: JobManagerCheckpointStorage
>> - State Backend: HashMapStateBackend
>>
>> Is this a bug? Is there a way to set state backend to S3 using SQL Client?
>>
>> Thanks,
>> Dai
>>
>>
>>
>


Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-16 Thread Konstantin Knauf
Hi Xingbo,

you are totally right. Thank you for noticing. This also affected Flink
1.13.6, the other release I was recently managing. I simply skipped a step
in the release guide.

It should be fixed now. Could you double-check?

Cheers,

Konstantin

On Wed, Mar 16, 2022 at 4:07 AM Xingbo Huang  wrote:

> Thanks a lot for being our release manager Konstantin and everyone who
> contributed. I have a question about pyflink. I see that there are no
> corresponding wheel packages uploaded on pypi, only the source package is
> uploaded. Is there something wrong with building the wheel packages?
>
> Best,
> Xingbo
>
> Leonard Xu  于2022年3月16日周三 01:02写道:
>
>> Thanks a lot for being our release manager Konstantin and everyone who
>> involved!
>>
>> Best,
>> Leonard
>>
>> 2022年3月15日 下午9:34,Martijn Visser  写道:
>>
>> Thank you Konstantin and everyone who contributed!
>>
>>
>>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Setting S3 as State Backend in SQL Client

2022-03-16 Thread Paul Lam
Hi,

If I remember correctly, set operations supports only a limited set of 
configurations.

Most of them are table options that are listed on table configuration [1] plus 
some pipeline options.

State backend options are not likely one of them.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/
 


Best,
Paul Lam

> 2022年3月15日 19:21,dz902  写道:
> 
> Just tried editing flink-conf.yaml and it seems SQL Client does not respect 
> that also. Is this an intended behavior?
> 
> On Tue, Mar 15, 2022 at 7:14 PM dz902  > wrote:
> Hi,
> 
> I'm using Flink 1.14 and was unable to set S3 as state backend. I tried 
> combination of:
> 
> SET state.backend='filesystem';
> SET state.checkpoints.dir='s3://xxx/checkpoints/';
> SET state.backend.fs.checkpointdir='s3://xxx/checkpoints/';
> SET state.checkpoint-storage='filesystem'
> 
> As well as:
> 
> SET state.backend='hashmap';
> 
> Which covered both legacy 1.13 way to do it and 1.14 new way to do it.
> 
> None worked. In the Web UI I see checkpoints being made to the Job Manager 
> continuously. Configuration reads:
> 
> - Checkpoint Storage: JobManagerCheckpointStorage
> - State Backend: HashMapStateBackend
> 
> Is this a bug? Is there a way to set state backend to S3 using SQL Client?
> 
> Thanks,
> Dai
> 
>