2019-10-28 10:14:21 UTC - Chris Miller: What about the inverse - using a Pulsar function to join two topics that use different schemas? Would that require storing any join state in the context? ---- 2019-10-28 12:11:41 UTC - Viji: I have to perform some initialization like opening connection to an external source which is a one time operation in my function. Similarly, need to close the connection to the external resource when the function exits. What is the best option to achieve it? ---- 2019-10-28 12:16:33 UTC - Matt Mitchell: @Viji I have the same needs as well. Depending on what it is you’re trying to do, Pulsar IO connectors (PushSource) have a simple open/close life-cycle. I haven’t spent a lot of time understanding how/when those methods are called, but the examples show how: <https://github.com/apache/pulsar/blob/master/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L76> ---- 2019-10-28 12:22:01 UTC - Viji: @Matt Mitchell The external source provides additional information which is used to transform the input in the function. Is there an option to do similar open/close within the Function ? ---- 2019-10-28 12:25:44 UTC - Matt Mitchell: Your function will need to open the external connection one time for many function invocations? ---- 2019-10-28 12:27:09 UTC - Viji: exactly ---- 2019-10-28 12:28:23 UTC - Viji: One option is to initialize on first invocation of the function and in further invocations skip the initialization ---- 2019-10-28 12:29:03 UTC - Viji: Still it is not clear when to close connection to the resource ---- 2019-10-28 12:35:00 UTC - Matt Mitchell: Yeah. I’ve been trying to figure that out too. One option I was considering, is to have a background thread to clean things up after there’s been no activity for n time-units. And then yeah, open the connection when the counter is == 0. ---- 2019-10-28 12:36:28 UTC - Matt Mitchell: You could also (maybe?) have another topic for control messages, like “start” and “stop”. But that may or may not work for your use case. In my case, the start and stop come from user actions, so it’s something I’m considering. ---- 2019-10-28 12:40:56 UTC - Viji: When the function is deleted by user or when the function broker is stopped, does the function get notified ? ---- 2019-10-28 12:41:09 UTC - limadelrey: Hi guys. When using Apache Pulsar as a message queue it seems to me that a consumer using a new subscription can't get data that is persisted before its subscription (I'm using "retentionTimeInMinutes" property with "-1" in order to persist data that was even acknowledged). However, I would like to receive data since the beginning in order to get the current's data state. Am I doing something wrong? Or is it the expected behaviour? ---- 2019-10-28 12:48:40 UTC - Matt Mitchell: Good question. Not from what I can tell. Maybe `@PostConstruct` and `@PreDestroy` will work? ---- 2019-10-28 12:58:37 UTC - Alexandre Berthaud: @Alexandre Berthaud has joined the channel ---- 2019-10-28 13:00:39 UTC - Sijie Guo: You have a couple of options:
1) if the subscription doesn’t exist before, you can open a consumer with SubscriptionInitialPostion.earlist. 2) if you already have a subscription, you can call #seek to seek to the earliest position. 3) you can choose to use Reader api, then you have the flexibility to choose where to start. ---- 2019-10-28 13:01:35 UTC - Alexandre Berthaud: Hello everyone! I'm trying to use the debezium connector with a PostgreSQL 11 database (latest version supported by debezium), debezium says it's compatible with the logical replication output of PostreSQL but the pulsar connector seems to only ever want to use the protobuf output. Is there a way to make it use the default PostgreSQL logical replication without the postgres-decoderbufs extension? Thanks! ---- 2019-10-28 13:03:59 UTC - Sijie Guo: @Viji @Matt Mitchell : 1) for sink and sources, open method is called when a sink/source instance is instantiated; close method is called when the sink/source instance is closed. a instance can be closed when it is moved to other function worker, or explicitly stopped via admin API. 2) currently functions don’t expose the start/stop API. We tend to keep the API as simple as possible. If there are requirements for exposing such API, feel free to create a github issue. so the community can pick it up and work on providing the API. ---- 2019-10-28 13:04:34 UTC - Sijie Guo: @jia zhai or @tuteng can help with this. ---- 2019-10-28 13:06:58 UTC - Viji: Thanks @Sijie Guo. I wanted to confirm that I have not overlooked the API. ---- 2019-10-28 13:09:04 UTC - jia zhai: ```debezium says it's compatible with the logical replication output of PostreSQL ``` @Alexandre Berthaud where did you find this? I would like to get more details of this first. ---- 2019-10-28 13:10:01 UTC - jia zhai: In Pulsar, we call the debezium connector directly, There should be no difference. ---- 2019-10-28 13:13:56 UTC - Matt Mitchell: Thanks @Sijie Guo, that helps. Is there a way to feed sources (PushSource etc.) with input, similar to how functions work? For example, the first run of the source would read all records from the external source system. For the second run, I’d like to feed the source with what was produced in the previous run, to compare the lastModified values from each record against the source (if changed, fetch/produce otherwise skip) - is that kind of workflow supported with sources? ---- 2019-10-28 13:14:56 UTC - xiaolong.ran: @Retardust <https://github.com/apache/pulsar/pull/5357> This change has been merged, will be released in 2.4.2. Thanks your work for this. heart : Retardust ---- 2019-10-28 13:22:13 UTC - Alexandre Berthaud: I found this here: <https://debezium.io/documentation/reference/0.10/connectors/postgresql.html> ---- 2019-10-28 13:22:47 UTC - Alexandre Berthaud: "a logical decoding output plugin which has to be installed and configured in the PostgreSQL server, one of [...] pgoutput, the standard logical decoding plug-in in PostgreSQL 10+ (maintained by the Postgres community, used by Postgres itself for logical replication); this plug-in is always present, meaning that no additional libraries must be installed, and the Debezium connector will interpret the raw replication event stream into change events directly." ---- 2019-10-28 13:22:52 UTC - Matt Mitchell: I’m guessing that ^^ use case is outside the design aspects of the Pulsar IO connector API? ---- 2019-10-28 13:24:59 UTC - Alexandre Berthaud: maybe the connector simply isn't using debezium 0.10 yet? ---- 2019-10-28 13:26:03 UTC - jia zhai: You are right, currently it is: <debezium.version>0.9.5.Final</debezium.version> ---- 2019-10-28 13:27:24 UTC - Alexandre Berthaud: alright, that explains it ---- 2019-10-28 13:27:38 UTC - tuteng: <https://debezium.io/documentation/reference/0.9/connectors/postgresql.html#pgoutput> ---- 2019-10-28 13:27:44 UTC - Alexandre Berthaud: huh ---- 2019-10-28 13:28:06 UTC - jia zhai: right, I am in the same page with Tuteng ---- 2019-10-28 13:34:56 UTC - Sijie Guo: @Matt Mitchell this sounds like a feedback-loop system. the source reads the first batch of messages and write the messages to the output topic; the source continues read the second batch of messages from external sourcee and consume from the output topic and compare. do I understand your use case correctly? ---- 2019-10-28 13:37:02 UTC - Matt Mitchell: Yes, exactly @Sijie Guo ---- 2019-10-28 13:40:22 UTC - Alexandre Berthaud: well, then, I'm not sure how to make this work; I have tried setting `plugin.name` to `pgoutput` just in case (even though the debezium doc says the default format is pgoutput) and it fails to connect entirely so that's weird ; removing the plugin.name parameter causes the connector to try and use the protobuf output ---- 2019-10-28 13:40:40 UTC - Alexandre Berthaud: did either of you try this already by any chance? ---- 2019-10-28 13:46:47 UTC - Matt Mitchell: Could it be as simple as instantiating a Pulsar client within the PushSource, which could then be used to consume from whatever other topics as needed? ---- 2019-10-28 13:51:54 UTC - Sijie Guo: Yes. Or we can expose something in the context object that you can use. Because there is already a Pulsar client in the instance. ---- 2019-10-28 13:52:31 UTC - Matt Mitchell: Ah ok ---- 2019-10-28 14:05:04 UTC - limadelrey: Thank you once again @Sijie Guo. All options seem perfectly fine. I'll stick to the first one since it allows to perform an initial read and use (what it seems to me) the subscription consumer offset after it. ---- 2019-10-28 14:22:01 UTC - Eno Thereska: Hi folks, about the Kafka wrappers: is there a plan to support newer versions of Kafka with their latest features (e.g. exactly once)? Or has the work stopped with Kafka version 0.10? Thanks. ---- 2019-10-28 14:40:04 UTC - tuteng: This feature seems to have been set in postgres10, so there should be no need to do anything on the connector side. I think you can set up the connector according to the document <http://pulsar.apache.org/docs/en/next/io-debezium-source/> and then turn on this function in the database. ---- 2019-10-28 14:53:52 UTC - Alexandre Berthaud: That's my thinking indeed, and I did just that but I'm getting `ERROR: could not access file "decoderbufs": No such file or directory` in the postgresql logs and an error in the connector logs which ensues from this error ; which means it's trying to use the protobuf output ---- 2019-10-28 14:54:44 UTC - Alexandre Berthaud: I'm using the latest pulsar version (2.4.1) which is using debezium 0.9.2 which should be compatible with the logical replication of postgresql as well ---- 2019-10-28 14:58:41 UTC - Sijie Guo: There are two efforts around Kafka: 1) for the Kafka java wrapper, master already supports 2.3.0 version. it will be released as part of pulsar 2.5.0. but it doesn’t implement the transaction API yet. We hope to add this functionality once Pulsar supports transaction (planned for 2.5.0 but might be delayed to 2.6.0). 2) there is another effort to support Kafka wire protocol compatibility at broker or proxy level. ---- 2019-10-28 14:59:27 UTC - Ivan Kelly: @Sijie Guo you planning transactions for wrapper or wire protocol impl? ---- 2019-10-28 15:02:00 UTC - Eno Thereska: Great, thanks! ---- 2019-10-28 15:10:13 UTC - tuteng: I think you may need to install <https://github.com/debezium/postgres-decoderbufs#building-and-installing-decoderbufs> first ---- 2019-10-28 15:10:46 UTC - Sijie Guo: @Ivan Kelly for 1), it is wrapper. for 2), it will be wire protocol. although we haven’t finalized the details, we are working on getting Pulsar’s own transaction in first (but keep the mind open for making supporting Kafka transaction easier). ---- 2019-10-28 15:11:21 UTC - Alexandre Berthaud: Well, yes to use the protobuf output, but the debezium doc says that I should be able to use the pglogical output without installing anything else ; but I guess I will resolve to using the protobuf output ---- 2019-10-28 15:16:27 UTC - Ivan Kelly: i suspect it my require a parallel implementation of transactions, at least partially, since consumer groups (and offset commit) have their own impl ---- 2019-10-28 15:22:27 UTC - Sijie Guo: yes. that’s the current problem ---- 2019-10-28 15:22:58 UTC - Sijie Guo: so we haven’t started the Kafka transaction implementation on KoP yet ---- 2019-10-28 15:25:13 UTC - Ivan Kelly: it could work if instead of using managed ledger for offset storage, you use a pulsar topic. then to commit the offset you could just make it part of a pulsar transaction. You'd still have to deal with mutual exclusivity for the producer, but I think that's an easier problem ---- 2019-10-28 15:33:15 UTC - Sijie Guo: correct. we don’t use ML directly, since KoP is running as a protocol handler of broker getting access to `BrokerService`, it uses the topic instance at broker level to write to an `__offset` topic. Hence we are able to pass a txn id as part of the writes. it works at a high level. but there are still many places to think clearly before wiring all these together. ---- 2019-10-28 16:46:53 UTC - Chris Miller: I'm curious as to why there is even a distinct API for Reader vs Consumer. Shouldn't they be unified in to one? It seems to me that would be simpler to understand, and provide more flexibility, than having to choose one over the other ---- 2019-10-28 16:47:38 UTC - Sijie Guo: Reader is essentially a “consumer” with non-durable subscription. ---- 2019-10-28 17:00:24 UTC - Chris Miller: but there are differences, eg a Reader can only subscribe to a single topic ---- 2019-10-28 17:09:10 UTC - Chris Miller: Hmm am I right in thinking then that Reader is in fact somewhat redundant, and everything a Reader can do can be achieved with the Consumer API (maybe with a little more code)? ---- 2019-10-28 17:10:00 UTC - Chris Miller: ReaderImpl.java seems to imply that is the case, since it's configuring then delegating everything to a consumer anyway ---- 2019-10-28 17:11:15 UTC - Sijie Guo: Reader is used for applications that manages offsets themselves. And it is for ordering consumption. Hence it is only for reading from a topic (partition) in sequence. Pulsar manages offsets for “Consumers”. ---- 2019-10-28 17:11:56 UTC - Sijie Guo: As I explained earlier, a Reader is a “Consumer” with non-durable subscription. ---- 2019-10-28 17:12:09 UTC - Sijie Guo: That’s also how Reader is implemented. ---- 2019-10-28 17:12:24 UTC - Sijie Guo: Reader has its own use cases though ---- 2019-10-28 17:16:24 UTC - Chris Miller: Yes I understand, but what I'm getting at is Reader doesn't implement Consumer and it's not really clear to me why. For use cases like the one described in this thread, wouldn't it be better if some initialisation code could configure/create an appropriate Consumer, then pass it to the business logic that uses it? Currently if you want to take advantage of Reader (say for replaying old messages), it's awkward to use it with the same code that works on the "live" stream ---- 2019-10-28 17:19:29 UTC - Sijie Guo: For exactly-once processing such as integration with Flink, a Reader is much better than a Consumer, because Flink manages the offsets in its state. You can use Consumer with Seek and Filtering duplicated messages to achieve similar things. but it is just not as clear as using a Reader. ---- 2019-10-28 17:19:54 UTC - Sijie Guo: with that being said, you can use a Consumer to achieve what Reader can do. ---- 2019-10-28 17:20:32 UTC - Sijie Guo: The key principle to choose Reader or Consumer is who manages the offsets. ---- 2019-10-28 17:21:02 UTC - Sijie Guo: if you want Pulsar manages the offsets, use Consumer; if you want to manages the offsets, use Reader. ---- 2019-10-28 17:23:57 UTC - Chris Miller: I see, thanks for that clarification, hadn't realised that was the driving reason to choose one over the other. Sounds like I mostly want Consumer then :slightly_smiling_face: Though I'm still a bit unclear about recovery and replay scenarios on topics that are normally processed live. I'll mull it over... ---- 2019-10-28 18:28:07 UTC - Luke Lu: Do we have any timeline for the #2 (Kafka protocol compatibility in Pulsar proxy/broker)? ---- 2019-10-29 01:42:42 UTC - Fabio Brunetti: @Fabio Brunetti has joined the channel ---- 2019-10-29 02:29:10 UTC - Ali Ahmed: if people want to try out the go admin cli for pulsar with brew ```brew install <https://gist.githubusercontent.com/aahmed-se/02de9aa85988ce5257f9bc2cbcc24f4b/raw/093fa2e3c86ba7c1a20eedc6429ae2871ec1c549/pulsarctl.rb>``` +1 : Yong Zhang, xiaolong.ran, Sijie Guo, Ali Ahmed, tuteng, Yuvaraj Loganathan ---- 2019-10-29 04:11:14 UTC - Adam Abrams: @Adam Abrams has joined the channel ---- 2019-10-29 07:47:14 UTC - ShuJie Zhai: @ShuJie Zhai has joined the channel ---- 2019-10-29 08:13:32 UTC - Zabas: @Zabas has joined the channel ----
