#general


@sosyalmedya.oguzhan: Can we pass hdfs path to `jobSpecFile` config for reading job spec instead of local path? ```${SPARK_HOME}/bin/spark-submit \\ --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \\ --master "local[2]" \\ --deploy-mode client \\ --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \\ --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \\ local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \\ -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/airlineStats/sparkIngestionJobSpec.yaml``` like; ```-jobSpecFile ```
  @mayanks: Looking at the code `LaunchDataIngestionJobCommand` seems to assume jobSpecFile is local.
  @mayanks: Perhaps we can enhance this. Mind filing an issue?
  @fx19880617: it requires more configs passed to pinot to init hdfs filesystem then read the config file.
  @fx19880617: I feel it’s better to wrapper a script to copy the file from hdfs to local then run it
  @mayanks: Yeah, agree
@kevinv: @kevinv has joined the channel
@kevinv: Hello All, I started looking into Apache Pinot for a company usecase. We would like to read rows from Cassandra tables and insert them into Pinot within Apache Flink. Seems like from what I have read in the documentation so far, I would have to write a Custom Batch Segment Writer. Is there any way I can this without writing a Custom Writer and instead do a push into Pinot, like using JDBC insert statements for example?
  @mayanks: If you can stream output to Kafka from your flink job, Pinot can consume for there
  @kevinv: We don't have Kafka but instead Solace, so I guess I would need to write a custom Stream Ingester since I think Pinot only currently supports Kafka?
  @mayanks: You don’t have to write a custom segment writer. You can also write to a format like orc/avro/parquet and Pinot has utilities to read those
  @kevinv: I would like to avoid having to set up additional infrastructure for hadoop or kafka and use what we currently have which is Solace.
  @mayanks: In that case, another option I can think of is writing a connector for Solace. We have abstracted out realtime stream ingestion api's, so it should be doable to write that connector. FYI, we currently have connectors for flavors of Kafka, and are in the processing of writing one for Kinesis using this abstraction.
  @kevinv: ok I see, thank you
  @g.kishore: we dont a write API for Pinot. There is a Flink sink that is WIP. @yupeng @marta can provide more info here.
  @yupeng: The flink sink is not available now. perhaps you can consider spark
@phillip.fleischer: hey, i am playing with presto/pinot and when doing simple count queries i’m hitting rowcount maximums, anyone know why the aggregation isn’t delegated to pinot?
  @phillip.fleischer: i’m not sure if there’s some kind of hint, but it seems odd that presto would pull in all the rows instead of letting the provider do the aggregation
  @phillip.fleischer: i think maybe i should ask in trino slack channel instead… ignore me for now
  @g.kishore: there is no need for any hint, it should push down aggregation and filter
  @fx19880617: which presto and pinot are you using?
  @g.kishore: ok, trino has some tricks @elon.azoulay can help
  @phillip.fleischer: presto/343
  @phillip.fleischer: pinot i am on “latest” pulled today, not sure the versionn
  @mike.davis: this is an open issue in Trino
  @phillip.fleischer: whoa, nice find!
  @elon.azoulay: Yep, we use it in production here, along w the other pull requests:)
  @mike.davis: you can use dynamic tables as a hint: ```SELECT * FROM pinot.default."SELECT SELECT MAX(col1), COUNT(col2) FROM pinot_table GROUP BY col3, col4";```
  @phillip.fleischer: ooh, wow, that at least will give me some idea how it might work when it’s fixed :slightly_smiling_face:
  @phillip.fleischer: @elon.azoulay do you have your own fork, image, build? is that something you might recommend?
  @phillip.fleischer: and thanks for the quick feedback folks, very helpful
  @elon.azoulay: It might be safer to use the pull requests on github, let me think on that.
  @elon.azoulay: This all should be merged soon though
@mike.davis: When generating offline segments is there any recommendations around target segment size?
  @g.kishore: 100 to 500 mb
@zhangmingfeng1982: @zhangmingfeng1982 has joined the channel
@garystaf: @garystaf has joined the channel
@coolhongly: @coolhongly has joined the channel

#random


@kevinv: @kevinv has joined the channel
@zhangmingfeng1982: @zhangmingfeng1982 has joined the channel
@garystaf: @garystaf has joined the channel
@coolhongly: @coolhongly has joined the channel

#feat-text-search


@rajasekhar.m: @rajasekhar.m has joined the channel
@ravikumar.m: @ravikumar.m has joined the channel

#feat-presto-connector


@rajasekhar.m: @rajasekhar.m has joined the channel
@ravikumar.m: @ravikumar.m has joined the channel

#troubleshooting


@kevinv: @kevinv has joined the channel
@ken: Has anyone been successful with using Superset and Pinot, when following the Superset manual installation steps at ? I finally got Superset running after some manual fixup, but when trying to connect Pinot as a database I get `Can't load plugin: sqlalchemy.dialects:http`, which makes me think my install is borked. And yes, I know Docker is the preferred solution here :slightly_smiling_face:
  @fx19880617: did you pip install pinotdb?
  @fx19880617: I only tried build docker image on top of superset official image
  @fx19880617:
  @ken: Thanks @fx19880617 - I guess I’ll give the Dockerfile a try next
  @fx19880617: It’s based on an existing docker image, you may need to check how superset is packaging their image there
@zhangmingfeng1982: @zhangmingfeng1982 has joined the channel
@coolhongly: @coolhongly has joined the channel

#pinot-dev


@rajasekhar.m: @rajasekhar.m has joined the channel
@ravikumar.m: @ravikumar.m has joined the channel

#getting-started


@kevinv: @kevinv has joined the channel

#fix-numerical-predicate


@amrish.k.lal: @amrish.k.lal has joined the channel
@amrish.k.lal: @amrish.k.lal set the channel description: PR:
@jackie.jxt: @jackie.jxt has joined the channel
@mayanks: @mayanks has joined the channel
@steotia: @steotia has joined the channel
@amrish.k.lal: Hi Folks, this is to discuss the PR . I think I have had some discussion with all of you regarding this. So summarizing things below
@amrish.k.lal: *SUMMARY* *Q1: Why does the query “SELECT * from table WHERE yearId > 2000.0” fail.* The query fails in the path that involves doing a binary search over yearId dictionary. The query fails in ColumnValueSegmentPruner.convertValue due to NumberFormatException. If this is fixed, then the query will fail in IntDictionary.insertionIndexOf function SortedDictionaryBasedRangePredicateEvaluator with same exception. If both are fixed, then we have to deal with query correctness issues and this is where the bulk of this PR (and rewrites) come in. *Q2. Why does the query “SELECT * from table WHERE yearId > 2010 - 10” fail.* On the broker side, Calcite will call ArithmeticFunctions.minus which takes input parameters of type double and produces double as result. So broker rewrites predicate to *“yearId > 2000.0”* before sending the query to Server. This query basically reduces to the first query and fails for the same reason. *Q3: Why doesn’t the query “SELECT * FROM table WHERE yearId > yearId - 10.0” fail? It compares different numerical types too.* Broker rewrites this query to “SELECT * FROM table WHERE minus(yearId, minus(yearId, 10.0)) > 0” (query algebra rewrite without taking into account metadata or cost information). Server does a full column scan due to presence of functions in predicate instead of using binary search over dictionary column and hence the evaluation path is different so we don’t hit the type conversion issue. Note that scalar operators such as minus take double as input argument and produce double as output, this takes care of type conversion for this particular query. *Q4. Can we overload the functions in ArithmeticFunctions to take different datatypes as input and produce right output datatype?* We abandoned this idea because it will require writing each scalar function that takes numerical parameter 16 times for all combinations of double, float, int, and long. We also considered rewriting functions using BigInteger, but that would requiring rewriting all the scalar functions and also overhauling the function invocation code. It will also introduce backward incompatibility because certain calculations that were working earlier won’t work anymore and vice versa (overflow, underflow issues, etc). We thought this would create more issues than solving. *Q5: Can the rewrites be done on the Broker side* All the optimizations and rewrites that are done on the Broker side seem to be done purely at SQL syntax tree level. For example “WHERE 10.0 < col1” will get rewritten to “WHERE col1 > 10.0”, nested predicates expressions will get flattened, etc. As far as I could see, there is no existing hooks or plumbing in place, on the broker side, for rewriting the query based on table schema, column types, other metadata, or runtime information. To add this rewrites on the Broker side, would involve scanning every query for some new patterns and then doing the rewrite after loading the metadata and type information.  On the server side, all the hooks and framework for such rewrites already exist and hence these rewrites easily fit on the server side. Note that the predicate rewrite on the server side is done only once irrespective of whether there is one value or a billion values in the column. For example, if RangePredicate exists we do RangePredicate specific rewrite. If it doesn’t exist then we don’t do the rewrite. Any additional compile time cost here only applies to those queries where these specific predicates are found. Doing this on the Broker side will mean scanning each and every query to see whether these patterns exist and then do the rewrite.
@amrish.k.lal: @jackie.jxt I verified most of the above through debugger, but if we need to check or verify anything else let me know. As I mentioned earlier, a lot of this was discussed with @mayanks earlier.
@jackie.jxt: The statement for broker side rewrite is not true. Check `FilterOptimizer` interface, all the required info are there
@jackie.jxt: We should not perform this rewrite per segment. Per server is okay, but worse than per query
@amrish.k.lal: Right, I looked at `FilterOptimizer`, it only manipulates query algebra without considering schema or type information. As far as I could see, the rewrites on the Broker side don't even consider whether table or column exists. If the framework had already existed, then I would have been happy to put the rewrites on the Broker side, but I am not seeing that framework that would enable these rewrites easily.
@amrish.k.lal: Sorry not sure what you mean "per segment"?
@jackie.jxt: In the current implementation, the rewrite is triggered in `PredicateEvaluatorProvider.getPredicateEvaluator()`, which is called in segment planning phase
@jackie.jxt: So you will perform this rewrite on each segment
@jackie.jxt: Basically if the query hits 1000 segments, you will do this rewrite 1000 times
@jackie.jxt: Here is the interface signature, where the schema is passed in ``` /** * Optimizes the given filter, returns the optimized filter. */ FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema); /** * Optimizes the given filter, returns the optimized filter. */ _expression_ optimize(_expression_ filterExpression, @Nullable Schema schema);```
@amrish.k.lal: Hmm, Will need to check. I only have one segment in my local instance.
@jackie.jxt: Think of it this way, if we do it on broker side, we only rewrite the query once. If we do it on server side, we potentially need to rewrite the query 10+ (number of servers hit) times. If we do it per segment, then potentially 1000+ times
@jackie.jxt: We need to do a lot of parsing using `BigDecimal` for the rewrite, so it is definitely not cheap
@amrish.k.lal: I need to check the segment part, but we do the rewrite once in each server right?
@jackie.jxt: Also, the per-value extra `_precomputed` flag check could also hurt the performance
@jackie.jxt: No, it is in the planning code, which is done on each segment
@amrish.k.lal: So if we do this under `FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema)`, then basically we scan every query that comes in for the pattern that needs to be rewritten right? On the server side RangePredicate specific rewrite is only done when range predicate exists.
@jackie.jxt: We need to rewrite `EQ`, `NOT_EQ`, `IN`, `NOT_IN`, `RANGE`
@jackie.jxt: Actually the rewrite has to happen on broker side, or `MergeRangeFilterOptimizer` will already fail if you have 2 range filters on the same column
@jackie.jxt: You can try this query on your branch: `select * from table where intCol > 2.0 and intCol < 5.0`
@amrish.k.lal: Yes, I guess we agree that these need to be rewritten, but need to decide whether on server or broker. Personally I don't see one if check for precomputed to be a major performance issue specially since the if check is limited to only these predicates. But let me check the Broker side.
@amrish.k.lal: This query works with rewrites in the PR: `select * from table where intCol > 2.0 and intCol < 5.0` .
@amrish.k.lal: It will fail without rewrites
@jackie.jxt: How do you test it? Do you have schema uploaded?
@jackie.jxt: The query should fail on the broker side if you have schema uploaded
@amrish.k.lal: I am testing it against baseballStats table through QuickStart.
@amrish.k.lal: let me try just a sec
@jackie.jxt: `select * from baseballStats where yearID > 1.0 and yearID > 2.0 limit 10`
@amrish.k.lal: `select * from baseballStats where yearID > 2000.0 AND yearID < 2100.0` is failing without hitting the server.
@amrish.k.lal: without the decimal points, it works though.
@jackie.jxt: The rewrite should be performed before the `MergeRangeFilterOptimizer`
@jackie.jxt: Other `FilterOptimizer` also rewrites the values within the filter, please take a look
@amrish.k.lal: Where does it fail right now?
@jackie.jxt: When getting the range bound value in the `MergeRangeFilterOptimizer`
@amrish.k.lal: ok
@jackie.jxt: @mayanks @steotia wdyt?
@steotia: I am immersed in an oncall thing. Will catch up here in a bit
@jackie.jxt: One thing is missing right now is that we cannot short-circuit the always false predicate
  @amrish.k.lal: This was just an optimization
  @jackie.jxt: For always false, we need to put a place holder filter to represent that though
@amrish.k.lal: So to do rewrites on the Broker side, we will extend from `FilterOptimizer` and then do the specific rewrites there? `MergeRangeFilterOptimizer` seems to be specific to merging range predicates?
@jackie.jxt: In such case, broker should not route the query but should directly return
@jackie.jxt: Yes, you can add another implementation of the `FilterOptimizer` to rewrite the values within the predicates
@steotia: Overall I agree with doing this once per query and not per segment. Infact when we moved to Query context on the server Jackie had made changes to do _expression_ compilation for filter tree from once per segment to once per server. So let's stick to that
@amrish.k.lal: I am ok with that approach, the only reason I put it on the server side was because it seemed straightforward at the time, but this doesn't look bad either and we have the type information in Schema. We can move the rewrites from Server to Broker and see how that works.
@jackie.jxt: Hmm.. There might be another problem regarding that rewrite though. If the lhs of the predicate is an _expression_, we cannot get the type information from the schema
@amrish.k.lal: I was playing around with "*SELECT * FROM table WHERE yearId > yearId - 10.0"*. It gets rewritten by the broker to *SELECT * FROM table WHERE minus(yearId, minus(yearId, 10.0)) > 0*
@jackie.jxt: Hmm, then it works
@amrish.k.lal: On the server side minus always returns double, so this basically boils down to a rewrite that would adapt the RHS to double.
@amrish.k.lal: We can assume that scalar functions always return double on the Broker side?
@steotia: Does it get rewritten to minus function or SUB transform function?
@jackie.jxt: We can at least rewrite whole numbers to format without decimal point
@jackie.jxt: I can still see some query might fail, but that should be super rare
@amrish.k.lal: Hmm, that would still leave out a lot of queries right? Seems like all scalar functions that operate on numerical values return double, so can we assume that Function on LHS will always return double?
@jackie.jxt: No, it can return different types
@steotia: Why can't we write a template and at compile time generate functions for multiple combinations
@steotia: Combinations are not a lot here
@jackie.jxt: If we always rewrite whole numbers, at least user can use `floor` and `ceil` to convert the values
@amrish.k.lal: I discussed this a bit with Mayank. It basically lead to 16 overloaded functions for minus operator. Plus Calcite already parses numerical values as BigDecimal, so the other option was to overhaul the function invocation code to work with BigInteger arguments.
@jackie.jxt: One simple way to unblock the `now() - 10000` query is to change the return value of `ceil(double a)` and `floor(double a)` (we should also add a `round(double a)`) to `long`, and user can put `floor(now() - 10000)`
@amrish.k.lal: Right, but that is almost hardcoded to this specific case. At some point, this would again come back in a different form and we would then need to consider a more general approach.
@jackie.jxt: Yeah, but at least that can unblock the users (they got a way to make the query)
@steotia: User can also do cast(now() - 10000) as long
@jackie.jxt: They can also use `time + 10000 > now()`, but it has performance impact
@jackie.jxt: We don't have cast as scalar function
@jackie.jxt: But yes, we can add that
@steotia: Recently someone else in open source ran into the same problem and cast solved their issue.
@steotia: I remember discussing this in general
@jackie.jxt: They can put `cast()` around columns and expressions, but not literals
@amrish.k.lal: Personally, I would favor a generalized fix :slightly_smiling_face: otherwise, it seems like the issue will keep coming up in one way or another.
@steotia: Function overloading will give us a generalized fix right. We can do that in a later PR. Need not write everything manually. Templates can be used to generate the class at compile time.
@amrish.k.lal: One option may be to handle it for cases of `column <operator> literal` and leave the more complex case where LHS is a function _expression_ and then tackle that problem at some better time in future? Function overloading will involve overloading `minus` function 16 times and same with other numerical scalar functions.
@amrish.k.lal: @jackie.jxt any thoughts on just doing the rewrites for `column <operator> literal` that should be generic enough and take care of the use-cases encountered earlier?
@jackie.jxt: Yes, for now we can perform the rewrite only for column LHS, similar to what we did in `MergeRangeFilterOptimizer`. That should be able to fix most of the problems
@amrish.k.lal: Sounds good and then perhaps later we can consider ways to make it more generic to handle cases such as function(column) > 5.4 etc :slightly_smiling_face:
@amrish.k.lal: Also, if there is a way to generate overloaded functions for different type parameters at compile time through templates, it would be good to look into that at some point.
@amrish.k.lal: I am on time off for next week, but will create a PR for broker side rewrite for `column <operator> literal` :slightly_smiling_face:
@jackie.jxt: Sounds good, thanks!
@jackie.jxt: We can revisit the function overload in the future. That won't be an easy change
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

Reply via email to