About FlinkkafkaConsumer msg delay Pyflink[1.15]
Hi: we use pyflink[1.15],but find it have large delay,avg to 500ms,with same java code,it's delay in range 1-6 ms,it's have any idea to fix it? Thanks pyflink demo code: from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode from pyflink.datastream.connectors import FlinkKafkaConsumer,FlinkKafkaProducer from pyflink.datastream.functions import RuntimeContext, MapFunction import time,json def mymap(value): now = time.time() sv = json.loads(value) num = float(sv) print(now,"recv:",value,"span:",now - num) return sv +"_"+str(now) def demo1(): env = StreamExecutionEnvironment.get_execution_environment(); env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC); env.set_parallelism(1); # 启动消费者 deserialization_schema = SimpleStringSchema() kafka_props = { 'bootstrap.servers': "127.0.0.1:9092", 'group.id': "test_group_1", }; kafka_source = FlinkKafkaConsumer( topics = "kafka_demo", deserialization_schema = deserialization_schema, properties = kafka_props, ); ds = env.add_source(kafka_source).set_parallelism(1) serialization_schema = SimpleStringSchema() kafka_producer = FlinkKafkaProducer( topic = "test_producer_topic", serialization_schema = serialization_schema, producer_config = kafka_props); ds = ds.map(mymap, Types.STRING()).add_sink(kafka_producer); env.execute("Test"); if __name__ == '__main__': print("start flink_demo1") demo1() java code: package com.lhhj; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import java.util.Properties; import org.apache.flink.api.common.eventtime.WatermarkStrategy; public class Test { public static byte[] ProcessMsg(byte[] value) { try { long now = System.currentTimeMillis(); String sb = new String(value, "UTF-8"); double recvf = Double.parseDouble(sb)*1000; long recv = (long)recvf; System.out.println("recv msg " + recv + " now:" + now + " diff:" + (now - recv)); String ret = Long.toString(recv) + "_" + Long.toString(now); return ret.getBytes(); } catch (Exception e) { System.out.println("err msg:"+e.getMessage()); return value; } } public static void main(String[] args) { System.out.println("Hello World! FlinkDelayTest"); String broker = "127.0.0.1:9092"; KafkaSource source = KafkaSource.builder() .setBootstrapServers(broker) .setStartingOffsets(OffsetsInitializer.latest()) .setTopics("kafka_demo") .setValueOnlyDeserializer(new CharSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "cal_req"); KafkaSink sink = KafkaSink.builder() .setBootstrapServers(broker) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setValueSerializationSchema(new CharSchema()) .setTopicSelector((record) -> { return "test_producer_topic"; }) .build()) .build(); stream.map(new MapFunction() { @Override public byte[] map(byte[] value){ return ProcessMsg(value); } }).filter(new FilterFunction() { @Override public boolean filter(byte[] value) throws Exception { if (value.length > 0) { return true; } return false; } }).sinkTo(sink); try { env.execute("FlinkDelayTest"); } catch (Exception e) { e.printStackTrace(); } } } max...@foxmail.com
About the Current22 event
Hi my Flink fellas, The CFP for the Current22 [1] event is about to close. The Current event is the next generation of KafkaSummit. It expands the scope to cover **ALL** the technologies for real-time data, not limited to Kafka. Given Flink is a leading project in this area, the program committee is actively looking for speakers from the Flink community. Please don't hesitate to submit a talk [2] if you are interested! Thanks, Jiangjie (Becket) Qin [1] https://2022.currentevent.io/website/39543/ [2] https://sessionize.com/current-2022/
Re: context.timestamp null in keyedprocess function
hi. Could you share more info for us, e.g. exception stack? Do you set the assigner for all the source? I think you can modify the KeyedProcessFuncition to print the message whose timestamp is null. Best, Shengkai bat man 于2022年6月15日周三 14:57写道: > Has anyone experienced this or has any clue? > > On Tue, Jun 14, 2022 at 6:21 PM bat man wrote: > >> Hi, >> >> We are using flink 12.1 on AWS EMR. The job reads the event stream and >> enrich stream from another topic. >> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract >> timestamp from the event and handle idle source partitions. >> AutoWatermarkInterval set to 5000L. >> The timestamp extractor looks like below - >> >> @Override >> public long extractTimestamp(Raw event, long >> previousElementTimestamp) { >> lastRecordProcessingTime = System.currentTimeMillis(); >> Double eventTime = >> >> Double.parseDouble(event.getTimestamp().toString()).longValue(); >> long timestamp = Instant.ofEpochMilli(eventTime >> *1_000).toEpochMilli(); >> if (timestamp > currentMaxTimestamp) { >> currentMaxTimestamp = timestamp; >> } >> return timestamp; >> } >> >> Second step the rules are joined to events, this is done in keyedprocess >> function. >> What we have observed is that at times when the job starts consuming from >> the beginning of the event source stream, the timestamp accessed in >> the keyedprocess fn using context.timestamp comes as null and the code is >> throwing NPE. >> This happens only for some records intermittently and the same event when >> we try to process in another environment it processes fine, that means the >> event is getting parsed fine. >> >> What could be the issue, anyone has any idea, because as far as timestamp >> goes it could only be null if the timestamp extractor sends null. >> >> Thanks. >> >
Re: Flink config driven tool ?
You are just spamming the inbox by sending these emails. You can just ignore sending those emails if it isn't adding any value. Regards, Sucheth Shivakumar website : https://sucheths.com mobile : +1(650)-576-8050 San Mateo, United States On Wed, Jun 15, 2022 at 1:41 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > I would have helped you if it was written in Scala instead of Java. > > On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh < > rakshit.ram...@datakaveri.org> wrote: > >> I'm working on such a thing. >> It's in early stages and needs a lot more work. >> I'm open to collaborating. >> https://github.com/datakaveri/iudx-adaptor-framework >> >> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Hi Flink Community, >>> >>> can someone point me to a good config-driven flink data movement tool >>> Github repos? Imagine I build my ETL dag connecting source --> >>> transformations --> target just using a config file. >>> >>> below are a few spark examples:- >>> https://github.com/mvrpl/big-shipper >>> https://github.com/BitwiseInc/Hydrograph >>> >>> Thanks & Regards >>> Sri Tummala >>> >>> > > -- > Thanks & Regards > Sri Tummala > >
Re: Flink config driven tool ?
I would have helped you if it was written in Scala instead of Java. On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh < rakshit.ram...@datakaveri.org> wrote: > I'm working on such a thing. > It's in early stages and needs a lot more work. > I'm open to collaborating. > https://github.com/datakaveri/iudx-adaptor-framework > > On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Flink Community, >> >> can someone point me to a good config-driven flink data movement tool >> Github repos? Imagine I build my ETL dag connecting source --> >> transformations --> target just using a config file. >> >> below are a few spark examples:- >> https://github.com/mvrpl/big-shipper >> https://github.com/BitwiseInc/Hydrograph >> >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Flink config driven tool ?
Hi, Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL dag. If you find anything that SQL does not work for you, please share your requirements with us. We might check if it makes sense to build new features in Flink to support them. Best regards, Jing On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh < rakshit.ram...@datakaveri.org> wrote: > I'm working on such a thing. > It's in early stages and needs a lot more work. > I'm open to collaborating. > https://github.com/datakaveri/iudx-adaptor-framework > > On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Flink Community, >> >> can someone point me to a good config-driven flink data movement tool >> Github repos? Imagine I build my ETL dag connecting source --> >> transformations --> target just using a config file. >> >> below are a few spark examples:- >> https://github.com/mvrpl/big-shipper >> https://github.com/BitwiseInc/Hydrograph >> >> Thanks & Regards >> Sri Tummala >> >>
Re: Flink running same task on different Task Manager
Hi Great, Do you mean there is a Task1 and a Task2 on each task manager? If so, I think you can set Task1 and Task2 to the same parallelism and set them in the same slot sharing group. In this way, the Task1 and Task2 will be deployed into the same slot(That is, the same task manager). You can get more details about slot sharing group in [1], and you can get how to set slot sharing group in [2]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group Best, Lijie Weihua Hu 于2022年6月15日周三 13:16写道: > I don't really understand how task2 reads static data from task1, > but I think you can integrate the logic of getting static data from http in > task1 into task2 and keep only one kind of task. > > Best, > Weihua > > > On Wed, Jun 15, 2022 at 10:07 AM Great Info wrote: > > > thanks for helping with some inputs, yes I am using rich function and > > handling objects created in open, and also and network calls are getting > > called in a run. > > but currently, I got stuck running this same task on *all task managers* > > (nodes), when I submit the job, this task1(static data task) runs only > one > > task manager, I have 3 task managers in my Flink cluster. > > > > > > On Tue, Jun 14, 2022 at 7:20 PM Weihua Hu > wrote: > > > >> Hi, > >> > >> IMO, Broadcast is a better way to do this, which can reduce the QPS of > >> external access. > >> If you do not want to use Broadcast, Try using RichFunction, start a > >> thread in the open() method to refresh the data regularly. but be > careful > >> to clean up your data and threads in the close() method, otherwise it > will > >> lead to leaks. > >> > >> Best, > >> Weihua > >> > >> > >> On Tue, Jun 14, 2022 at 12:04 AM Great Info wrote: > >> > >>> Hi, > >>> I have one flink job which has two tasks > >>> Task1- Source some static data over https and keep it in memory, this > >>> keeps refreshing it every 1 hour > >>> Task2- Process some real-time events from Kafka and uses static data to > >>> validate something and transform, then forward to other Kafka topic. > >>> > >>> so far, everything was running on the same Task manager(same node), but > >>> due to some recent scaling requirements need to enable partitioning on > >>> Task2 and that will make some partitions run on other task managers. > but > >>> other task managers don't have the static data > >>> > >>> is there a way to run Task1 on all the task managers? I don't want to > >>> enable broadcasting since it is a little huge and also I can not > persist > >>> data in DB due to data regulations. > >>> > >>> >
Re: New KafkaSource API : Change in default behavior regarding starting offset
Hello Martijn, Thanks for the link to the release note, especially : "When resuming from the savepoint, please use setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new KafkaSourceBuilder to transfer the offsets to the new source." So earliest is the new default We use for sure .committedOffsets - we have it by default in our custom KafkaSource builder to be sure we do not read all the previous data (earliest) What bother me is just this change in starting offset default behavior from FlinkKafkaConsumer to KafkaSource (this can lead to mistake) In fact it happens that we drop some of our kafka source state to read again from kafka committed offset, but maybe nodoby does that ^^ Anyway thanks for the focus on the release note ! Best Regards, -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 15 juin 2022 à 10:58, Martijn Visser a écrit : > Hi Bastien, > > When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes > included the instruction how to migrate from FlinkKafkaConsumer to > KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a > section on how to upgrade to the latest connector version that I think is > outdated. I'm leaning towards copying the migration instructions to the > generic documentation. Do you think that would have sufficed? > > Best regards, > > Martijn > > [1] > https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version > > Op wo 15 jun. 2022 om 09:22 schreef bastien dine : > >> Hello jing, >> >> This was the previous method in old Kafka consumer API, it has been >> removed in 1.15, so source code is not in master anymore, >> Yes I know for the new Offset initializer, committed offset + earliest as >> fallback can be used to have the same behavior as before >> I just wanted to know whether this is a changed behavior or I am missing >> something >> >> >> >> Bastien DINE >> Freelance >> Data Architect / Software Engineer / Sysadmin >> http://bastiendine.io >> >> >> >> Le mar. 14 juin 2022 à 23:08, Jing Ge a écrit : >> >>> Hi Bastien, >>> >>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() >>> within >>> Flink in the master branch. Could you please point out the code that >>> committed offset is used as default? >>> >>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() >>> is used, an exception will be thrown at runtime in case there is no >>> committed offset, which is useful if the user is intended to read from the >>> committed offset but something is wrong. It might feel weird if it is used >>> as default, because an exception will be thrown when users start new jobs >>> with default settings. >>> >>> Best regards, >>> Jing >>> >>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine >>> wrote: >>> Hello everyone, Does someone know why the starting offset behaviour has changed in the new Kafka Source ? This is now from earliest (code in KafkaSourceBuilder), doc says : "If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default." from : https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset Before in old FlinkKafkaConsumer it was from committed offset (i.e : setStartFromGroupOffsets() method) which match with this behaviour in new KafkaSource : : OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST This change can lead to big troubles if user pay no attention to this point when migrating from old KafkaConsumer to new KafkaSource, Regards, Bastien -- Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io >>>
Re: Flink config driven tool ?
I'm working on such a thing. It's in early stages and needs a lot more work. I'm open to collaborating. https://github.com/datakaveri/iudx-adaptor-framework On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Hi Flink Community, > > can someone point me to a good config-driven flink data movement tool > Github repos? Imagine I build my ETL dag connecting source --> > transformations --> target just using a config file. > > below are a few spark examples:- > https://github.com/mvrpl/big-shipper > https://github.com/BitwiseInc/Hydrograph > > Thanks & Regards > Sri Tummala > >
Re: New KafkaSource API : Change in default behavior regarding starting offset
Hi Bastien, When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes included the instruction how to migrate from FlinkKafkaConsumer to KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a section on how to upgrade to the latest connector version that I think is outdated. I'm leaning towards copying the migration instructions to the generic documentation. Do you think that would have sufficed? Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer [2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version Op wo 15 jun. 2022 om 09:22 schreef bastien dine : > Hello jing, > > This was the previous method in old Kafka consumer API, it has been > removed in 1.15, so source code is not in master anymore, > Yes I know for the new Offset initializer, committed offset + earliest as > fallback can be used to have the same behavior as before > I just wanted to know whether this is a changed behavior or I am missing > something > > > > Bastien DINE > Freelance > Data Architect / Software Engineer / Sysadmin > http://bastiendine.io > > > > Le mar. 14 juin 2022 à 23:08, Jing Ge a écrit : > >> Hi Bastien, >> >> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() >> within >> Flink in the master branch. Could you please point out the code that >> committed offset is used as default? >> >> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() >> is used, an exception will be thrown at runtime in case there is no >> committed offset, which is useful if the user is intended to read from the >> committed offset but something is wrong. It might feel weird if it is used >> as default, because an exception will be thrown when users start new jobs >> with default settings. >> >> Best regards, >> Jing >> >> On Tue, Jun 14, 2022 at 4:15 PM bastien dine >> wrote: >> >>> Hello everyone, >>> >>> Does someone know why the starting offset behaviour has changed in the >>> new Kafka Source ? >>> >>> This is now from earliest (code in KafkaSourceBuilder), doc says : >>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will >>> be used by default." from : >>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>> >>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >>> setStartFromGroupOffsets() >>> method) >>> >>> which match with this behaviour in new KafkaSource : : >>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >>> >>> This change can lead to big troubles if user pay no attention to this >>> point when migrating from old KafkaConsumer to new KafkaSource, >>> >>> Regards, >>> Bastien >>> >>> -- >>> >>> Bastien DINE >>> Data Architect / Software Engineer / Sysadmin >>> bastiendine.io >>> >>
Re: Kafka Consumer commit error
Hi, Thanks for reporting the issue and the demo provided by Christian! I traced the code and think it's a bug in KafkaConsumer (see KAFKA-13563 [1]). We probably need to bump the Kafka client to 3.1 to fix it but we should check the compatilibity issue first because it’s crossing major version of Kafka (2.x -> 3.x). [1] https://issues.apache.org/jira/browse/KAFKA-13563 Best, Qingsheng > On Jun 15, 2022, at 02:14, Martijn Visser wrote: > > Hi Christian, > > There's another similar error reported by someone else. I've linked the > tickets together and asked one of the Kafka maintainers to have a look at > this. > > Best regards, > > Martijn > > Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz > : > Hi Alexander, > > > > I’ve created a Jira ticket here > https://issues.apache.org/jira/browse/FLINK-28060. > > Unfortunately this is causing some issues to us. > > I hope with the attached demo project the root cause of this can also be > determined, as this is reproducible in Flink 1.15.0, but not in Flink 1.14.4. > > > > Kind regards, > > Christian > > > > Von: Alexander Fedulov > Datum: Montag, 13. Juni 2022 um 23:42 > An: Christian Lorenz > Cc: "user@flink.apache.org" > Betreff: Re: Kafka Consumer commit error > > > > This email has reached Mapp via an external source > > > > Hi Christian, > > > > thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this > application. Do you think this might still be related? > > > > No, in that case, Kafka transactions are not used, so it should not be > relevant. > > > > Best, > > Alexander Fedulov > > > > On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz > wrote: > > Hi Alexander, > > > > thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this > application. Do you think this might still be related? > > > > Best regards, > > Christian > > > > > > Von: Alexander Fedulov > Datum: Montag, 13. Juni 2022 um 13:06 > An: "user@flink.apache.org" > Cc: Christian Lorenz > Betreff: Re: Kafka Consumer commit error > > > > This email has reached Mapp via an external source > > > > Hi Christian, > > > > you should check if the exceptions that you see after the broker is back from > maintenance are the same as the ones you posted here. If you are using > EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging > transactions that Flink attempts to commit [1]. > > > > Best, > > Alexander Fedulov > > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance > > > > On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser > wrote: > > Hi Christian, > > > > I would expect that after the broker comes back up and recovers completely, > these error messages would disappear automagically. It should not require a > restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism > for fault tolerance. > > > > Best regards, > > > > Martijn > > > > Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz > : > > Hi, > > > > we have some issues with a job using the flink-sql-connector-kafka (flink > 1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance > (replication-factor=2), the taskmanagers executing the job are constantly > logging errors on each checkpoint creation: > > > > Failed to commit consumer offsets for checkpoint 50659 > > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException: > Offset commit failed with a retriable exception. You should retry committing > the latest consumed offsets. > > Caused by: > org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > > > > AFAICT the error itself is produced by the underlying kafka consumer. > Unfortunately this error cannot be reproduced on our test system. > > From my understanding this error might occur once, but follow up checkpoints > / kafka commits should be fine again. > > Currently my only way of “fixing” the issue is to restart the taskmanagers. > > > > Is there maybe some kafka consumer setting which would help to circumvent > this? > > > > Kind regards, > > Christian > > Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 > München. > Registered with the District Court München HRB 226181 > Managing Directors: Frasier, Christopher & Warren, Steve > > This e-mail is from Mapp Digital and its international legal entities and may > contain information that is confidential or proprietary. > If you are not the intended recipient, do not read, copy or distribute the > e-mail or any attachments. Instead, please notify the sender and delete the > e-mail and any attachments. > Please consider the environment before printing. Thank you. > > Mapp Digital Germany GmbH with registered offices at Dachau
Re:Re: How to handle deletion of items using PyFlink SQL?
So when t=C2 arrives, the source connector must send a `DELETE` message about that the row C should be deleted to downstream, and send a new 'INSERT' message to notify downstream that a new row D should be insert into the sink. This source connector is just like a CDC source but it seems that you need to costomize it yourself. The `DELETE` message about row C is a RowData which RowKind is `DELETE`. When sink receive this DELETE message, it will notify the DB to delete this data, by either pk or the whole row if non-pk. -- Best! Xuyang 在 2022-06-14 19:45:06,"John Tipper" 写道: Yes, I’m interested in the best pattern to follow with SQL to allow for a downstream DB using the JDBC SQL connector to reflect the state of rows added and deleted upstream. So imagine there is a crawl event at t=C1 that happens with an associated timestamp and which finds resources A,B,C. Is it better to emit one event into the stream with an array of all resources or many events, each with one resource and a corresponding crawl timestamp. There is obviously a limit to the amount of data that can be in a single event so the latter pattern will scale better for many resources. Flink SQL sees this stream and processes it, then emits to a JDBC sink where there is one row for A, B, C. Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to have 3 rows if possible and not have C. Alternatively it should have 4 rows with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any more. I’m interested in a SQL solution if possible. J Sent from my iPhone On 9 Jun 2022, at 11:20, Xuyang wrote: Hi, Dian Fu. I think John's requirement is like a cdc source that the source needs the ability to know which of datas should be deleted and then notify the framework, and that is why I recommendation John to use the UDTF. And hi, John. I'm not sure this doc [1] is enough. BTW, I think you can also try to customize a connector[2] to send `DELETE` RowData to downstream by java and use it in PyFlink SQL, and maybe it's more easy. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions [2] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks -- Best! Xuyang 在 2022-06-09 08:53:36,"Dian Fu" 写道: Hi John, If you are using Table API & SQL, the framework is handling the RowKind and it's transparent for you. So usually you don't need to handle RowKind in Table API & SQL. Regards, Dian On Thu, Jun 9, 2022 at 6:56 AM John Tipper wrote: Hi Xuyang, Thank you very much, I’ll experiment tomorrow. Do you happen to know whether there is a Python example of udtf() with a RowKind being set (or whether it’s supported)? Many thanks, John Sent from my iPhone On 8 Jun 2022, at 16:41, Xuyang wrote: Hi, John. What about use udtf [1]? In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, the new resources as s2 will be collected by crawl. I think what you want is the deletion data that means 's1' - 's2'. So just use loop to find out the deletion data and send RowData in function 'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 'DELETE' means tell the downstream that this value is deleted. I will be glad if it can help you. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions [2] https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52 -- Best! Xuyang At 2022-06-08 20:06:17, "John Tipper" wrote: Hi all, I have some reference data that is periodically emitted by a crawler mechanism into an upstream Kinesis data stream, where those rows are used to populate a sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data Analytics). What is the best pattern to handle deletion of upstream data, such that the downstream table remains in sync with upstream? For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting in a DB with 3 rows. At some point between t=1 and t=2, the resource corresponding to R2 was deleted, such that at t=2 when the next crawl was carried out only rows R1 and R2 were emitted into the upstream stream. How should I process the stream of events so that when I have finished processing the events from t=2 my downstream table also has just rows R1 and R3? Many thanks, John
Re: New KafkaSource API : Change in default behavior regarding starting offset
Hello jing, This was the previous method in old Kafka consumer API, it has been removed in 1.15, so source code is not in master anymore, Yes I know for the new Offset initializer, committed offset + earliest as fallback can be used to have the same behavior as before I just wanted to know whether this is a changed behavior or I am missing something Bastien DINE Freelance Data Architect / Software Engineer / Sysadmin http://bastiendine.io Le mar. 14 juin 2022 à 23:08, Jing Ge a écrit : > Hi Bastien, > > Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within > Flink in the master branch. Could you please point out the code that > committed offset is used as default? > > W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() > is used, an exception will be thrown at runtime in case there is no > committed offset, which is useful if the user is intended to read from the > committed offset but something is wrong. It might feel weird if it is used > as default, because an exception will be thrown when users start new jobs > with default settings. > > Best regards, > Jing > > On Tue, Jun 14, 2022 at 4:15 PM bastien dine > wrote: > >> Hello everyone, >> >> Does someone know why the starting offset behaviour has changed in the >> new Kafka Source ? >> >> This is now from earliest (code in KafkaSourceBuilder), doc says : >> "If offsets initializer is not specified, OffsetsInitializer.earliest() will >> be used by default." from : >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >> >> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >> setStartFromGroupOffsets() >> method) >> >> which match with this behaviour in new KafkaSource : : >> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >> >> This change can lead to big troubles if user pay no attention to this >> point when migrating from old KafkaConsumer to new KafkaSource, >> >> Regards, >> Bastien >> >> -- >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >