convert DataStream to DataStream

2022-12-13 Thread Noel OConnor
Hi, I'm reading from a kafka topic and performing some custom AVRO deserialisation but now I want to create a change log stream from the source kafka topic. I'm currently creating a temporary view and then selecting * from that and finally changing the resultant table to a change log stream via

Re: Accessing kafka message key from a KafkaSource

2022-12-08 Thread Noel OConnor
o that you can extract Kafka message key, > headers, timestamp, etc. > > Then pass that when you create a KafkaSource via "setDeserializer" method. > > On Wed, Dec 7, 2022 at 6:14 AM Noel OConnor wrote: >> >> Hi, >> I'm using a kafka source to read in messages

Accessing kafka message key from a KafkaSource

2022-12-07 Thread Noel OConnor
Hi, I'm using a kafka source to read in messages from kafka into a datastream. However I can't seem to access the key of the kafka message in the datastream. Is this even possible ? cheers Noel

Re: Concatenating a bounded and unbounded stream

2022-10-27 Thread Noel OConnor
, Oct 27, 2022 at 5:14 AM Jin Yi wrote: > > would using a hybrid source work for you if it's the same type between the > sources? > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/ > > On Wed, Oct 26, 2022 at 8:05 AM Noel OConno

Concatenating a bounded and unbounded stream

2022-10-26 Thread Noel OConnor
Hi, I have a need to create a new stream from a bounded and unbounded stream i.e. create a stream populated with messages from the bounded stream and at the end of the bounded stream add messages from the unbounded stream to the new stream continuously. I know there's stream unions but the key

Re: Sorting by source event time

2022-09-27 Thread Noel OConnor
g order, I think you can try to set the > internal configuration `__table.exec.sort.non-temporal.enabled__` to true. > But remember it's just experimental, which may bring unexpect behavior. > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Noel OConnor" >

Sorting by source event time

2022-09-26 Thread Noel OConnor
Hi, I have a temporary view created from a datastream. tableEnv.createTemporaryView("productDetailsView", productStream, Schema.newBuilder() .columnByMetadata("eventTime", "TIMESTAMP_LTZ(3)", "rowtime", Boolean.TRUE)

Re: Enrichment of stream from another stream

2022-09-19 Thread Noel OConnor
temporal join." > > I'd try to use the SQL for the first part of the job to make this join, and > then if I need the DataStream API convert it. > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins > > > On Sep 17, 2022,

Enrichment of stream from another stream

2022-09-17 Thread Noel OConnor
Hi, I'm trying to determine the best way to enrich the event payload of a fast moving incoming stream with values in another stream which is far more slow moving. I'm converting the second stream into a table for continuous query functionality and I wonder what is the best way to take the values

Re: DataStream and DataStreamSource

2022-09-14 Thread Noel OConnor
hods extended from > SingleOutputStreamOperator. > > Best regards, > Jing > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/ > > On Wed, Sep 14, 2022 at 3:23 PM Noel OConnor wrote: >> >> Hi, >> I'm new to flink an

DataStream and DataStreamSource

2022-09-14 Thread Noel OConnor
Hi, I'm new to flink and I'm trying to integrate it with apache pulsar. I've gone through the demos and I get how they work but one aspect that I can't figure out is what's the difference between a DataStream and a DataStreamSource. When would you use one over the other? cheers Noel