#general
@angelina.teneva: @angelina.teneva has joined the channel
@laabidi.raissi: Hello @karinwolok1 I am a Java software engineer. I hava experience with classic Java web applications (Spring, Hibernate), also with relation databases and Elasticsearch. Now I am considering some data engineering experience and I discovered Pinot on Twitter. I am in my first steps following some YouTube tutorials
@karinwolok1: Cool! Welcome! Do you have a use for it right now, or just thought it was an interesting technology to learn about?
@karinwolok1: Kafka Summit London looking for presenters! :slightly_smiling_face: CFP closes Feb 2022. Submit your talk now! :partying_face:
@diogo.baeder: Hi folks! I have a question about segment thresholds. If I have something like this: ``` 'realtime.segment.flush.threshold.rows': '10000', 'realtime.segment.flush.threshold.time': '24h', 'realtime.segment.flush.desired.size': '100M',``` does this mean that the first value that gets reached from the above ones determines that the segment will be flushed? Or is it the last value reached that determines that? For example, if a segment has been filling for 24h already, but has only 200 rows and 10M in size, does it get flushed because it reached the 24h mark?
@mark.needham: soooo.... The High Level Realtime Segment Data Manager • Checks `realtime.segment.flush.threshold.rows` on every record that gets processed. If that value is exceeded it flushes. • If it hasn't been exceeded then there's a task that checks once a minute if `realtime.segment.flush.threshold.time` has been exceeded and flushes if it has. That task also checks the rows threshold too in case it was missed by the first check I guess. If the time threshold has been exceeded and no new documents have been indexed it won't flush the segment. But if there have been > 0 documents indexed it will flush. And then the Low Level Realtime Segment Data Manager checks: • `realtime.segment.flush.desired.size` gets translated into a number of rows threshold based on the size of each row in the previous segment. ``` * The formula used to compute new number of rows is: * targetNumRows = ideal_segment_size * (a * current_rows_to_size_ratio + b * previous_rows_to_size_ratio) * where a = 0.25, b = 0.75, prev ratio= ratio collected over all previous segment completions``` I'm not entirely sure how it switches between those two data managers, but I expect @npawar or @mayanks will know. But assuming that it's using the high level one, for your example: ```For example, if a segment has been filling for 24h already, but has only 200 rows and 10M in size, does it get flushed because it reached the 24h mark?``` It would flush at the 24h mark from my understanding.
@mayanks: @diogo.baeder please check out the stream config section here
@mayanks: Please let me know if it is still unclear, will fix the docs
@diogo.baeder: Awesome, that's very helpful, @mark.needham! Thanks @mayanks, I've seen the docs but it's not very clear how the flushing happens when multiple flushing rules are configured; Mark's explanation above was what I needed, just to understand how this will happen.
@kchavda: I was wondering about the same thing. So each segment should have a consistent size correct? I've got this set-up but my segment sizes start off small and then get larger
@diogo.baeder: Overall, then, my understanding is that it's a "whatever limit gets hit first" case, then.
@mark.needham: so I'm not an expert of this code, but this is the place where it decides when to flush the segment:
@g.kishore: Yes, what ever reaches first.. but note that under the hood there are only two thresholds • rows • Time Size gets converted into rows by looking at previous segments.. it takes a few iterations for Pinot to get this right as it needs to learn the mapping between rows and size
@diogo.baeder: Got it... thanks @g.kishore!
@mark.needham: @g.kishore what does it do if you specify both rows and size? Does it ignore the rows threshold in favour of size or something?
@g.kishore: Good question.. I don’t know
@g.kishore: Typically you use one or the other
@mayanks: Typically whatever threshold meets first is honored. The only exception is desired size, which take effect only when rows is set to zero (as in the docs).
@diogo.baeder: Ah, got it. Thanks man! :slightly_smiling_face:
@npawar: 1. Rows has to be 0 for size to take effect 2. No matter what size you specify, it starts off with 100k rows, and then slowly ramps up the rows to get to the desired size 3. If you specify only rows, and no size, then the rows get divided amongst all consuming partitions on a server (so if you specify rows 10k, and use 1 server and have 3 partitions and 1 replica, each segment will use 3333 as rows threshold) 4. Regardless of whether you go with size or just rows, you can (and always should) set a time threshold, so that it serves as the ultimate safety check. 5. Having said all this, typically you can just go with 0 rows, 24h time, and 200M size
@npawar: I think we should add a small section in docs for "setting thresholds"
@diogo.baeder: That's very useful info, thanks a lot @npawar! And yes, it would be a great addition to the docs, I agree
@mark.needham: @npawar I can do that. I guess it's more of less summarise this thread
@npawar: thank you Mark! yes, summarizing this thread. some points to note • would also be helpful to include the defaults that kick in, if nothing is specified • there’s some property name changes between 0.6 and 0.7
@derek.p.moore: @derek.p.moore has joined the channel
@joseph.kolko: @joseph.kolko has joined the channel
@xiangfu0: Hello Community, We are pleased to announce that Apache Pinot 0.9.0 is released! Apache Pinot is a distributed columnar storage engine that can ingest data in real-time and serve analytical queries at low latency. The release can be downloaded at
@diogo.baeder: That's awesome, folks, congratulations! Are there plans yet for a Docker image with the new version?
@xiangfu0: yes, docker image is also published:
@diogo.baeder: Ah, nice, thanks!
#random
@angelina.teneva: @angelina.teneva has joined the channel
@derek.p.moore: @derek.p.moore has joined the channel
@joseph.kolko: @joseph.kolko has joined the channel
#troubleshooting
@kkmagic99: Hi Team, We are in the process of testing related to UPSERT. I am testing upsert by creating a table with the following config: In a development environment with replicas = 1, partition = 1, broker = 1, and server = 1, id-based upsert worked fine as needed. However, in an operating environment with replicas = 2, partition = 3, broker = 6, server = 6 One id is upsert to 3 rows. Is there a place where the config don't work? -- Upsert Schema { "schemaName": "upsert_test", "primaryKeyColumns": ["id"], "dimensionFieldSpecs": [ { "name": "id", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "dt1", "dataType": "DOUBLE", "transformFunction": "JSONPATHDOUBLE(\"dt\", '$.1', -999.0)" } ], "dateTimeFieldSpecs": [ { "name": "ts_asia_seoul_datetime", "transformFunction": "toDateTime((ts*1000)+(timezoneHour('Asia/Seoul')*3600000), 'yyyy-MM-dd HH:mm:ss')", "dataType": "STRING", "format": "1:SECONDS:EPOCH", "granularity": "1:SECONDS" }, { "name": "ts_utc", "transformFunction": "ts*1000", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] } -- FULL Upsert TABLE { "tableName": "upsert_test", "tableType": "REALTIME", "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "ingestionConfig": { "filterConfig": { "filterFunction": "Groovy({ts < 1000000000}, ts)" } }, "segmentsConfig": { "schemaName": "upsert_test", "timeColumnName": "ts_utc", "timeType": "DAYS", "replicasPerPartition": "2", "retentionTimeUnit": "DAYS", "retentionTimeValue": "5", "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy" }, "tableIndexConfig": { "loadMode": "MMAP", "invertedIndexColumns": [ "id" ], "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "monitor_1", "stream.kafka.broker.list": "upsert_test-0.kafka-headless.prod.svc.cluster.local:9092,upsert_test-1.kafka-headless.prod.svc.cluster.local:9092,upsert_test-2.kafka-headless.iotops-prod.svc.cluster.local:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.desired.size": "450M" } }, "metadata": { "customConfigs": {} }, "routing": { "instanceSelectorType": "strictReplicaGroup" }, "upsertConfig": { "mode": "FULL", "comparisonColumn": "ts_utc" } }
@angelina.teneva: @angelina.teneva has joined the channel
@jmeyer: Hello :slightly_smiling_face: Quick question regarding `ingestionConfig` on REALTIME tables Is there any way to `jsonPathString` + further process the result with `Groovy` in `transformConfig` ?
@jmeyer: Say we have ```{"data": { "someKey": <needs_processing> } }``` I'd like to process `data.someKey` with `Groovy` It's possible from the query side with : ```groovy('{"returnType":"LONG","isSingleValue":true}', 'Long.valueOf((arg0.substring(0, 8) + arg0.substring(18)), 16)', userId)``` Basically need to do this ahead of time in the `transformConfig` of a REALTIME table
@jmeyer: Actually, this limitation doesn't seem to exist anymore And I managed to use an already transformed field ! :tada:
@mayanks: Thanks @jmeyer, could you paste an example that worked for you? We will fix the doc, cc @mark.needham @dunithd
@jmeyer: Sure, here it is: ```"ingestionConfig": { "transformConfigs": [ { "columnName": "userOid", "transformFunction": "jsonPathString(data, '$.userId')" }, { "columnName": "userId", "transformFunction": "Groovy({Long.valueOf((userOid.substring(0, 8) + userOid.substring(18)), 16)}, userOid)" } ] }```
@jmeyer: As we can see, to populate `userId` we use column `userOid` which is itself the output of a transformation
@jmeyer: Note that we are using `0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed` And I haven't tested any other combinations of `transformConfigs`, so maybe some limitations still apply (e.g. order in which they are listed, maybe ?)
@npawar: No limitations should apply.. we added support for chaining, early this year. Thanks for pointing out!
@jmeyer: Fantastic news, thanks for that feature, saved my day today :smile:
@trustokoroego: Hello :hand: , I deleted a realtime table from my Pinot cluster, but I can still see the consumer group with "empty name" created by pinot on the topic still keeping track of the consumer lags. See image below: Since pinot is using lowlevel consumers, there is actually no real concept of consumer group, and since the consumer group name is "blank" I am not able to delete it. While this may not affect any new realtime table created to consume this topic, is there no way to ensure the consumer is removed from the topic when the realtime table is removed?
@mayanks: @npawar @ssubrama
@ssubrama: @trustokoroego where do you see the consumer group with empty name ? (I assume you are using Kafka). If you never created an HLC table, then the consumer group part should not be there. Not sure where you are trying to delete it from. Zookeeper? Can you specify the full path name where it exists?
@trustokoroego: @ssubrama I see the consumer group with empty name in kafka. I am using AKHQ to visualize the topics and the related consumers(the screen shot). If I also use kafka-consumer-group.sh tool to check the list of consumers on the topic, I still see it.
@trustokoroego: is shows like this: consumer-group1 consumer-group2 consummer-froup3 the space between 2 and 3 is the blanck consumer group
@trustokoroego: another strange thing I just observed now after creating a new table from a different pinot cluster is that the consumer lags in the topic were cleaned up by the new real-time table I just deployed to another pinot cluster
@ssubrama: Glad that the table looks OK. So, LLC tables also create stream consumers (in your case, kafka consumers) to get metadata about the stream (e.g. get the number of partitions, the offset in each of them, etc.). We get this information every once in while from pinot controllers. I don't think we get metadata from pinot servers at all, although there could be some corner cases where we do. I need to check again. Can you verify a couple of things: (1) Are the segments for the (realtime) table LLC segments, or do you see HLC segments as well? (segment name that follows the pattern `tableName__partitionNumber__sequenceNumer__timestamp` are LLC segmemts). (2) Are the consumers in the consumer group that you see from pinot-controller hosts?
@ssubrama: I am not sure what the blank consumer group is about. Also, please confirm the version of kafka plugin you are using.
@trustokoroego: @ssubrama This is one of the segments.
@ssubrama: if all segments follow this naming pattern, you are good. If you are checking kafka on the service side, I am assuming there is some way to check which hosts are in the consumer group. If there is no way, then I am not sure what to do. If there is anything you can do to debug as to when and who created those consumer groups? Is there one group for all tables, or one per table added? @npawar do you think there is a metadata fetcher leak in the kafka 2.6 plugin? Can you verify?
@ssubrama: Other than this, I seem to have hit a wall here.
@npawar: there is no 2.6 plugin. the plugin should still be kafka-2.0, which hasnt been touched in a long time. i think the 2.6 version here is of thei kafka deployment
@trustokoroego:
@mark.needham: @mark.needham has joined the channel
@jain.arpit6: Hi, I am executing a inner join query in Presto but getting below error : Error when hitting host with Pinot query " select validfrom, Id, InsertTimsttamp from trade_realtime where (id = '1234') limit 2147483647 My original query is like this: select a.Id, max(a.ValidFrom) as MaxValidFrom, a.InsertTimeStamp from mypinotcluster.default.trade a inner join ( select Id, Max(InsertTimeStamp) as MaxInsertTime from mypinotcluster.default.trade group by Id ) b on a.Id = b.Id and a.InsertTimeStamp = b.MaxInsertTime AND a.Id='4-467125-467125 -0-50' group by a.Id, a.InsertTimeStamp LIMIT 20; Looks like Presto is computing the result in memory instead of executing in Pinot. Any ideas how can I make it work?
@mayanks: @jain.arpit6 The Presto-Pinot tries to push as much computation down to Pinot as Pinot can support. From what you are describing it seems it is trying to only push down the filter to Pinot and get all records that match that filter (hence INT.MAX_VALUE as the limit).
@mayanks: Can you run `select count(*) from trade_realtime where (id = '1234')` and share the results?
@jain.arpit6: Just found some config in broker which is equal to this number 2147483647
@richard892: that number is `Integer.MAX_VALUE`
@derek.p.moore: @derek.p.moore has joined the channel
@joseph.kolko: @joseph.kolko has joined the channel
#pinot-dev
@troy: @troy has joined the channel
@ken: Hi all - we’ve noticed when doing a big metadata push (1200 segments) that a lot of the time is spent downloading/expanding the segment from deep store (HDFS) to the local machine, so that the metadata file from the segment can be extracted and used to build the request to the controller. It should be possible to open a stream to the file and extract only that bit of the segment. Does this make sense as a reasonable enhancement?
@ssubrama: I don't understand the "build the request to the controller" part. You are doing a data push to pinot by placing data in hdfs and then a metadata push to the controller, right? Are you concerned about the servers loading the segments from deepstore? that cannot be avoided, right? Or am I getting the problem wrong?
@richard892: I think what Ken's getting at is do we need to read (or even download) the entire segment file? Can we start streaming the file, read the metadata and then abort once it's been read. Did I get you right @ken?
@ken: Hi @ssubrama by “build the request”, I mean that the metadata file in the segment has to be read and converted into the HTTP request to the controller (thus “building the request”). And yes, @richard892 is right about what I’m proposing. I tried it with this snippet of code ``` File inputFile = new File("/Users/kenkrugler/Downloads/adbeat/pinot-segments-ads/ads_us_2020-11_00.tar.gz"); File outputFile = new File("./build/metadata.properties"); TarArchiveInputStream tis = new TarArchiveInputStream(new GZIPInputStream(new FileInputStream(inputFile))); TarArchiveEntry tarEntry; while ((tarEntry = tis.getNextTarEntry()) != null) { if (tarEntry.isFile() && tarEntry.getName().endsWith("/metadata.properties")) { FileOutputStream fos = new FileOutputStream(outputFile); IOUtils.copy(tis, fos); fos.close(); break; } } tis.close();``` Given the ordering of files in segments I’ve seen, the metadata.properties occurs before the big pieces (columns.psf & star_tree_index), e.g. 15K of reading/decompression vs. 200-300MB for my segments.
@ken: Hmm, we’d also need the creation.meta file, which is 16 bytes, and located just before the metadata.properties file. See `SegmentPushUtils.generateSegmentMetadataFile()` for where it downloads and unpacks/untars the entire segment to get a file used by `sendSegmentUriAndMetadata()`.
@ssubrama: "read" from where? Which component is the reader we are talking about? Controller? Server? Segment pusher?
@ken: We’re talking about the metadata push job runner
@ken: See above, for call path to `SegmentPushUtils.generateSegmentMetadataFile()`
@ssubrama: ah, so the segment pusher. Hopefully the same entity as gthe segment generator. What we could do was during segment generation, copy the metadata file into another area and then read only that one. (and perhaps creation.meta). This has to be done before compressing the segment, of course
@ken: I think performance would be fine to just do what I did above, since those two (small) files occur before the big files in the tarball.
@ssubrama: Do we want to rely on this being the case all the time?
@ken: I’d have to look at the writer code, to see how hard it would be to enforce that constraint
@ssubrama: Also, the file is compressed in to tar.gz, so we may have to download the file and uncompress it anyway?
@ken: No, streaming it works - snippet of code above was successful
@ssubrama: we don't use this in LinkedIn, so if it works for u, great...
@richard892: separate metadata files feels cleaner to me
@ken: Then you have the issue of duplicated data that has to be in sync
@richard892: but that's hardly insurmountable
@ken: No, but a man with one watch always knows what time it is :slightly_smiling_face: My POV is that this is a very simple change that will dramatically improve performance for at least our use case (batch metadata push of many segments), without changing anything about the data layout.
@richard892: if this works, it's a good workaround, just need to be sure there are tests which break if the tarball gets reordered inadvertently
@ssubrama: exactly
#thirdeye-pinot
@angelina.teneva: @angelina.teneva has joined the channel
#getting-started
@sam: @sam has joined the channel
@troy: @troy has joined the channel
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@pinot.apache.org For additional commands, e-mail: dev-h...@pinot.apache.org