#general
@wrbriggs: Has anyone configured realtime Kafka ingest with SASL / jaas auth (as in how Confluent handles auth for their managed clusters)?
@g.kishore:
@g.kishore: @elon.azoulay ^^
@g.kishore: You can pass all the properties to underlying kafka consumer by putting them in streamCOnfigs
@wrbriggs: @g.kishore Thanks! How about using basic auth for schema registry? Same mechanism?
@g.kishore: @wrbriggs looks like this is not supported and as you pointed out there is an issue already. We will add this asap
@wrbriggs: Thanks!
@elon.azoulay: We use basic auth. In terms of confluent properties you can do something like this:
@elon.azoulay: ```"streamType": "kafka", "stream.kafka.consumer.type": "LowLevel", "stream.kafka.topic.name": "XXX", "stream.kafka.broker.list": "kafka:9092", "realtime.segment.flush.threshold.time": "6h", "realtime.segment.flush.threshold.size": "0", "realtime.segment.flush.desired.size": "200M", "stream.kafka.consumer.prop.auto.isolation.level": "read_committed", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.prop.group.id": "XXX", "stream.kafka.consumer.prop.client.id": "XXX", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "stream.kafka.decoder.prop.schema.registry.rest.url": "
@elon.azoulay: This is to use the avro confluent message decoder, is that what you wanted?
@wrbriggs: yes, where are you configuring the basic auth?
@wrbriggs: for the schema registry? I can’t find anything in the docs, and the code looks like it’s not passing those properties through to the `CachedSchemaRegistryClient` that you instantiate in the `KafkaConfluentSchemaRegistryAvroMessageDecoder`
@wrbriggs: but it would certainly make my life easier if it were already supported somehow
@elon.azoulay: It adds them as properties to the consumer.
@elon.azoulay: The kafka consumer interacts with the schema registry
@elon.azoulay: We do not use the auth parameters, but we do use schema registry and it's been working for us.
@elon.azoulay: Does that help?
@wrbriggs: Kind of? I’m a little confused, to be honest. You said you are using basic auth, but then you said you aren’t using auth parameters for the schema registry - so I’m trying to figure out how you are authenticating to it. It’s a little unclear.
@elon.azoulay: Maybe I misunderstood your question?
@wrbriggs: What I would expect to be able to do would be something like this: ``` "stream.kafka.decoder.prop.schema.registry.rest.url": "https://<schema-registry-url>.confluent.cloud", "stream.kafka.decoder.prop.schema.registry.basic.auth.credentials.source": "USER_INFO", "
@elon.azoulay: I was showing how we pass the schema registry and kafka low level consumer class stream configs
@wrbriggs: yes, that part makes sense - when you said “We use basic auth”, I think it threw me, because I didn’t see that anywhere in the provided example
@elon.azoulay: Ah, by basic I meant "no auth" :slightly_smiling_face:
@elon.azoulay: s/basic/no/ :slightly_smiling_face:
@elon.azoulay: So those params do not work for you?
@wrbriggs: They don’t, because our schema registry is secured and requires credentials
@wrbriggs: I believe that if you simply allowed properties under the `DECODER_PROPS_PREFIX` to flow into the config for the CachedSchemaRegistryClient (instead of only extracting the SSL properties, and tossing the rest), it would properly configure the underlying RestService - the logic is already there in the alternate constructors for CachedSchemaRegistryClient
@elon.azoulay: Sounds reasonable. I would create a github issue, someone can contribute a fix for that.
@wrbriggs: There’s an issue, and I’m testing a (hacky) fix locally to see if that actually works
@elon.azoulay: nice, good luck!
@wrbriggs:
@wrbriggs: I didn’t create it, but it’s the same problem I’m hitting
@karinwolok1: :loudspeaker: If anyone is using Kafka alongside Apache Pinot and wants to become a conference speaker:
@ahmed.jolani: @ahmed.jolani has joined the channel
@krishna080: @krishna080 has joined the channel
@dhurandargeek: @dhurandargeek has joined the channel
@dhurandargeek: Query regarding Apache Pinot, whats the typical OLAP cube size one can host in Apache Pinot, we have a cube which is almost 50 TB m it has some dimensions which very high cardinality but since raw data is more than 5 Petabytes, 50-100 TB is still a reasonable aggregation. We want interactive performance with our OLAP since it would power important Dashboards and drill-downs. So want to know how much data size we can push inside Apache Pinot??
@mayanks: With Pinot, you don't need to precompute all the cubes upfront (at write time). You can pre-aggregate your raw data, and the cubing will happen at read time based on the query.
@mayanks: If you are saying your single cube (which you will further drill-down on) is 50 TB, then Pinot scales horizontally, so theoretically there isn't a limit on data size. However, depending on your use case, you may need to enable some optimizations (eg partitioning). If you can share a bit more on your use case, we can help with figuring out to solve it using Pinot.
@dhurandargeek: I see we already have cube deployed, but it persisted in S3 with Presto in front
@dhurandargeek: We expect it to grow to around 150 TB in 1 year or so and then it would stabalise
@dhurandargeek: but we are not getting interactive performance and for that we are looking at Apache Pinot
@mayanks: Let's move to <#C011C9JHN7R|troubleshooting>
@dhurandargeek: Mostly sub-second latency. Our current latency with Presto and S3 is around 10+ seconds given the volume
@dhurandargeek: sure
@wrbriggs: I have a boneheaded question
@wrbriggs: Let’s say I want to query a table based on a datetime column, and I want to aggregate all records that have a timestamp within the last 10 minutes
@wrbriggs: in a more traditional SQL DB, I would write something like “SELECT dim1, dim2, DISTINCTCOUNTHLL(dim3) FROM myTable WHERE eventTimestamp >= NOW() - INTERVAL 10 MINUTES”
@mayanks: That should work in Pinot as well
@wrbriggs: ```org.apache.pinot.sql.parsers.SqlCompilationException: org.apache.calcite.sql.parser.SqlParseException: Encountered "- INTERVAL 10" at line 1, column 194.```
@wrbriggs: presumably it doesn’t like the interval subtraction, but I can’t find the correct syntax in the docs
@mayanks: Oh, sorry, I just read until now().
@mayanks: You will need to specify the actual interval value I think.
@wrbriggs: Essentially, I can’t figure out how to specify a time literal that is compatible with my timestamp column for doing math on it
@mayanks: As in, your app doesn't know the time unit?
@mayanks: It seems like a good feature ask. Should be easy to add, @fx19880617 wdyt?
@wrbriggs: Let me rephrase - how can I subtract time from the value returned by NOW(), if it’s possible? It seems like a common use case.
@fx19880617: i think now() is supported as millisseonds epoch value
@fx19880617: let me find an example query
@wrbriggs: thanks
@mayanks: No, the Interval
@mayanks: @fx19880617
@wrbriggs: I’m happy to directly modify NOW() as well, but it errors out on me for that, too
@wrbriggs: ```SELECT COUNT(*) FROM myTable WHERE eventTimestamp >= (NOW() - 10000)```
@wrbriggs: (e.g., give me everything in the last 10 seconds)
@wrbriggs: ```[ { "errorCode": 200, "message": "QueryExecutionError:\norg.apache.pinot.core.query.exception.BadQueryRequestException: Cannot convert value: '1.608319112661E12' to type: LONG\n\tat org.apache.pinot.core.query.pruner.ColumnValueSegmentPruner.convertValue(ColumnValueSegmentPruner.java:261)\n\tat org.apache.pinot.core.query.pruner.ColumnValueSegmentPruner.pruneRangePredicate(ColumnValueSegmentPruner.java:185)\n\tat org.apache.pinot.core.query.pruner.ColumnValueSegmentPruner.pruneSegment(ColumnValueSegmentPruner.java:105)\n\tat org.apache.pinot.core.query.pruner.ColumnValueSegmentPruner.prune(ColumnValueSegmentPruner.java:76)\n\tat ```
@fx19880617: hmmm
@fx19880617: ```select * from mytable where eventTimestamp > cast(now() - 10000 as long) limit 10```
@fx19880617: interval syntax is not yet supported
@fx19880617: can you create an issue for this : @wrbriggs
@mayanks: @amrish.k.lal fyi ^^
@amrish.k.lal: @wrbriggs Please create a ticket with detailed description (and maybe a few examples) of the feature and I will look into implementing it.
@mayanks: @amrish.k.lal in the query below, the request is to support INTERVAL; ```"SELECT dim1, dim2, DISTINCTCOUNTHLL(dim3) FROM myTable WHERE eventTimestamp >= NOW() - INTERVAL 10 MINUTES"```
@mayanks: Also, `select * from mytable where eventTimestamp > cast(now() - 10000 as long) limit 10` requires cast, may be we can see if that can be fixed (assuming other SQL engines don't requrie)
@fx19880617: agreed
@fx19880617: we should fix this
@amrish.k.lal: ok, so seems like there is a bug fix here and along with that I will look into supporting
@fx19880617: great
@amitchopra: Quick question - has anyone setup AWS Athena with Pinot. Given Athena is essentially Presto underneath
@fx19880617: I haven't. My feeling is that it requires pinot plugins to be installed in Athena, which should be provided by AWS
@amitchopra: Got it. Was trying to check if someone had the experience it works or not. Probably will try and update here for future users
@fx19880617: Agree
#random
@ahmed.jolani: @ahmed.jolani has joined the channel
@krishna080: @krishna080 has joined the channel
@dhurandargeek: @dhurandargeek has joined the channel
#troubleshooting
@taranrishit1234: { "schema": {"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"vhnumber"},{"type":"string","optional":true,"field":"phnnumber"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":true,"field":"vehicleType"},{"type":"int32","optional":true,"field":"status_id"}] ,"optional":false,"name":"driver"}, "payload":"name":"ss","vhnumber":"123","phnnumber":"123","id":17,"password":"2060","vehicleType":"ppol","status_id":10}} } this is the kafka event in consumer how do i convert this to a pinot schema the data that is needed is only the "payload" attribute how do i write custom decoder for it?
@npawar: Few options: 1. You can just have the whole payload string as a field using json_format UDF:
@g.kishore: not yet. PR is still in review
@taranrishit1234: we are streaming mysql data to kafka and we would like to stream same data outputted(which is the data above) to pinot table ,can we do that? because pinot accepts only one-level json format
@npawar: in that case you can use option 1 or 2
@fx19880617: I think json scalars in transform functions should work
@ahmed.jolani: @ahmed.jolani has joined the channel
@krishna080: @krishna080 has joined the channel
@dhurandargeek: @dhurandargeek has joined the channel
@mayanks:
@mayanks: @dhurandargeek what's the format at rest?
@dhurandargeek: Parquet now
@dhurandargeek: But we are facing issues in a couple of ways. Presto doesn't support data skipping since in our case dimensions are many (approx 25). Abloom filter based data skipping would help us. We are working on that internally since we also use Presto to query our data lake which is 5 petabytes. The current partition is mostly hourly daily and monthly, but we do need others too like Companies, Countries, and User Segments. Since we now see queries across these dimensions also
@dhurandargeek: Cost is a big deal for us since its a Saas offering
@g.kishore: short answer - you can push 100's of TB of data
@dhurandargeek: okay
@g.kishore: there are many things you can do to optimize on speed and cost
@dhurandargeek: It would require its own storage as far as in reading and test. in addition to S3
@g.kishore: yes
@dhurandargeek: i see
@g.kishore: you can use the persistent volume in AWS to host the data
@dhurandargeek: Yes the idea is to use EBS volumes
@rishbits1994: @rishbits1994 has joined the channel
@dhurandargeek: Our data has more than 400 different dimensions. Cube only has 25 of them, but we are planning to increase it, We are aware that adding a new dimension would increase volume with Cardinality of the new dimension (in the worst case). Is there a recommendation on the number of the dimensions too ?? As in how many dimensions I can add around the "group by".
@g.kishore: its the cardinality product of the group columns. By default we limit that to 100k in the group by query. (note this is in the actual query, not based on actual data)
@g.kishore: you can increase this limit but that will require you to up the memory of the server as needed
@g.kishore: 400 dimensions is not a problem since its columnar
@dhurandargeek: thank you, that's helpful. I did see that Pinot supports Dictionary encoding and we have lots of dimensions with low cardinality like os-version, os-type, ips , state, country, segment, cohort , seasonality, etc
@g.kishore: yes.
@mayanks: Yes, low cardinality columns will compress very well.
@ken: @g.kishore what happens if the cardinality of a column being used for group by is > 100K? If the query has an order by, will it use a priority queue to keep around the (approximate) top results?
@g.kishore: yes
@ken: So unless the data is weirdly skewed, if our LIMIT is something significantly lower (like 1000) then the final results should be exact, or nearly exact.
@g.kishore: yes
@g.kishore: this is really to protect against bad queries that might be run accidentally like select memberId, sum(views) from T and cardinality of memberId is in millions
@g.kishore: we still execute the query but only return top X and once the priority queue reached 100k, we drop new entries and only keep updating the existing keys
@g.kishore: you can see this in the response stats
@g.kishore: `numGroupsLimitReached`
@ken: OK, thanks. What if it’s something like `select memberId,sum(views) from T group by memberId order by sum(views) desc limit 1000` Here the “top” results are ones with the smallest number of views. Will the priority queue use the order by information to correctly keep the memberId groups with the smallest sum?
@g.kishore: priority queue is setup based on the order by clause
@canerbalci: Hi folks. I’m trying to debug this github-actions
@fx19880617: can you try to rebase to origin master?
@canerbalci: sure, let me try
@fx19880617: I think there is a recent change on this BatchConfigProperties
@fx19880617: that was some refactoring there
@canerbalci: Plain rebase didn’t seem to fix the problem. I guess I need to do some refactoring on my test
@fx19880617: sure
@fx19880617: also have you tried to rebuild pinot from root?
@fx19880617: mvn clean install -DskipTests
@canerbalci: Not this time, but I just pushed with the change (refactored the test) and it seems to have passed the point of the compilation failure!
@fx19880617: :stuck_out_tongue:
@canerbalci: Thanks for the help Xiang! But I’m curious why my test picks up a newer version of the pinot-spi package, rather than what exists in my commit. Is it due to github-actions setup?
@fx19880617: I think this is for PR
@fx19880617: yes github-action
@fx19880617: the build may tries to rebase i guess
@canerbalci: I see. I’ll see if there is an easy way to make those builds isolated.
@canerbalci: Also can you please take a look at this (somewhat related) two liner quick fix:
#feat-pravega-connector
@fpj: > one consumer consumer from pravega and then writes to Pinot I'm not sure which suggestion was that, can you elaborate? > we plan to add a write api soon > > may be we should integrate with Pravega once we have write API? How will the write API change the picture? To restate my concerns: • Dealing with scaling segments/shards/partitions is difficult and it is not a difficulty introduced by Pravega specifically, it is inherent to a stream changing its parallelism dynamically. The current API changes proposed seem to expect the plugin implementation to both expose segments individually so that they can be arranged in groups at table creation time and to deal transparently with the order of segments/shards/partitions. • It might add to the end-to-end latency if we process events in batches by requesting data between a start and an end. I wonder if that will affect the latency perceived by real-time queries. At least for Pravega, we would be able to address those concerns in the case we use the event stream API. This API requires the use of either coarse-grained checkpointing or opaque position objects, and the assignment of segments isn't deterministic in the case of rollbacks. I believe these are the main contention points. One concern on the Pinot side is that you need Pinot segments to be deterministically written. I'm wondering if we added a hint to the event saying from which segment it is coming from would help to achieve this goal in the absence of a deterministic schedule. A segment is owned by a single reader at a time and that reader is responsible for adding to that segment. If the segment changes ownership, then the change happens at checkpoint time. At a checkpoint, all readers in the group receive a special event indicating the checkpoint. Would something like this work?
#pinot-perf-tuning
@elon.azoulay: Hi, we had some server crashes. I was able to take some histo's (couldn't get a heap dump though). Like last time we saw that DirectR references were taking up the most space in the heap (refs to the mmapped segments). We found that setting `-XX:SoftRefLRUPolicyMSPerMB=0` and combined with java11 (pr coming in the next 2 days) we are seeing that gc clears more soft references than before. Found this on the jvm mailing list:
@elon.azoulay: Any thoughts/experiences regarding this? I will let you know how it goes:)
#getting-started
@taylorboren86: @taylorboren86 has joined the channel
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
