Chris, I think the schema repository acts as the stand in for the database catalog or hcat. People who don't have that will have to give their schema with the query. I think that can be some kind of plugin to provide the schema so it is automatically inferred for the avro people and manually provided for others.
-Jay On Fri, Jan 30, 2015 at 12:06 PM, Chris Riccomini <criccom...@apache.org> wrote: > Hey all, > > I have a few more comments on the metadata issue that I brought up. The > three things that we lack right now are: > > 1. Partition count. > 2. Partition key. > 3. Stream (or message) schema. > > These have to be evaluated both on the ingest (first consumers) and egress > (final producers) of a query. > > On the consumer-side, (1) can be determined simply by asking Kafka. On the > producer-side, we have no way to determine how many partitions an output > stream should be right now, short of having the user specify it in some > way. > > On the producer-side (2) must defined in SQL, or via a hint, for final > output to a stream. Intermediate streams can be keyed based off of the > consumers within the session. On the consumer-side, the consumer can always > re-partition an input stream based on the partition key that it needs, so > it doesn't need to know a priori what the key was. > > Message schemas (3) can be evaluated on the consumer-side at runtime by > poking at the incoming messages, which should be self-describing. Output > message schemas can be derived based off of the transformations of incoming > messages on the producer-side. > > If we're OK with these strategies, then it seems we only really need to > worry about: > > 1. Producer-side partition count. > 2. Producer-side partition key. > > One other annoying property of (3) is that compile-time validation of a > query won't be able to check that a field for a given stream actually > exists--it'll fail at runtime. This could perhaps be accelerated by > sampling some messages before deploying the job, to check that the messages > have the appropriate fields, but it's still a runtime check. > > Cheers, > Chris > > On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org> > wrote: > > > Hey all, > > > > Just catching up on this thread. The Calcite + Samza approach seems > pretty > > compelling to me. I think most of what Julian is arguing for makes sense. > > My main concern is with practicalities. > > > > One specific case of this is the discussion about the partitioning model. > > In an ideal world, I agree, developers wouldn't need to define partitions > > in SQL. Practically speaking, though, Samza (Kafka, really) currently > > doesn't have a metadata repository. Without a metadata repository, we > have > > no way of knowing 1) which key a topic is partitioned by, and 2) what the > > schema of the topic is (without looking at records). We know this > *within* > > a query (or session), but not *between* queries (or sessions) from > disjoint > > users. > > > > One could argue that we should spend time defining such a metadata > > repository, which is a reasonable argument. But that's also a fairly > large > > effort. I wonder if we might be able to cut some corners in a clean-ish > > forwards-compatible way, so that developers don't have to wait for us to > > fully implement (or integrate with) something like a Hive metadata store. > > > > In person, one thing you mentioned, Julian, was using hints, rather than > > stuff baked into the syntax. If that's stomach-able, we could support > > partitioning through hints, until we have a full blown metadata store. > > > > Thoughts? > > > > Cheers, > > Chris > > > > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net> > > wrote: > > > >> > >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> > > >> > I am wondering if I can get an average that's per 30 min window > >> averages? > >> > I.e. the following is the input events in a stream: > >> > {10:01, ORCL, 10, 10} > >> > {10:02, MSFT, 30, 30} > >> > {10:03, ORCL, 100, 110} > >> > {10:17, MSFT, 45, 75} > >> > {10:59, ORCL, 20, 130} > >> > {11:02, ORCL, 50, 50} > >> > Can I get the non-overlapping window average from 10:00-10:29, and > >> > 10:30-10:59, ... ? Could you give an example how to define that window > >> > operation in your model? Note that in this use case, although I may > >> have 1 > >> > trading record per minute from the stream, I only generate 2 average > >> > records from 10:00-11:00. > >> > >> That takes a bit of messy date arithmetic, made even more messy by the > >> fact that you can't regard a SQL timestamp as a "milliseconds" value as > one > >> would in C or Java, nor can you divide it. Hence the trick of > subtracting a > >> constant timestamp, which yields an interval value, and then doing > integer > >> division on that interval to find the number of 30-minute periods since > the > >> epoch. > >> > >> select stream rowtime, ticker, amount, > >> sum(amount) over ( > >> order by rowtime > >> partition by ticker, > >> (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30) > >> from StockTrades; > >> > >> If I were doing this kind of calculation often I'd define a UDF, or even > >> that user-defined window SPI I mentioned earlier. > >> > >> > {quote} > >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department > >> VARCHAR(20)) > >> > PARTITION BY HASHCODE (department); > >> > {quote} > >> > That's good! At least it resolves my question on: "which field is the > >> > partition key". However, I still have the question on the number of > >> > partitions. As I stated that when the system currently does not have a > >> > "auto-scaling" feature, the number of partitions for a stream has to > be > >> > explicitly specified. Where do you suggest to put this information in > >> w/o > >> > breaking SQL syntax? > >> > >> I imagine you could start the system on Tuesday with 10 partitions per > >> stream and restart it on Wednesday with 8 or 12? You wouldn't want to > >> change the SQL, because that's in the application. But you could change > the > >> definition of the stream, either the DDL or by changing some other > system > >> configuration. Then partitioning function (applied by the system to > route > >> the record) could, say, take the value of p modulo of the current > number of > >> streams. > >> > >> Julian > >> > >> > > >