Hey all, I have a few more comments on the metadata issue that I brought up. The three things that we lack right now are:
1. Partition count. 2. Partition key. 3. Stream (or message) schema. These have to be evaluated both on the ingest (first consumers) and egress (final producers) of a query. On the consumer-side, (1) can be determined simply by asking Kafka. On the producer-side, we have no way to determine how many partitions an output stream should be right now, short of having the user specify it in some way. On the producer-side (2) must defined in SQL, or via a hint, for final output to a stream. Intermediate streams can be keyed based off of the consumers within the session. On the consumer-side, the consumer can always re-partition an input stream based on the partition key that it needs, so it doesn't need to know a priori what the key was. Message schemas (3) can be evaluated on the consumer-side at runtime by poking at the incoming messages, which should be self-describing. Output message schemas can be derived based off of the transformations of incoming messages on the producer-side. If we're OK with these strategies, then it seems we only really need to worry about: 1. Producer-side partition count. 2. Producer-side partition key. One other annoying property of (3) is that compile-time validation of a query won't be able to check that a field for a given stream actually exists--it'll fail at runtime. This could perhaps be accelerated by sampling some messages before deploying the job, to check that the messages have the appropriate fields, but it's still a runtime check. Cheers, Chris On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org> wrote: > Hey all, > > Just catching up on this thread. The Calcite + Samza approach seems pretty > compelling to me. I think most of what Julian is arguing for makes sense. > My main concern is with practicalities. > > One specific case of this is the discussion about the partitioning model. > In an ideal world, I agree, developers wouldn't need to define partitions > in SQL. Practically speaking, though, Samza (Kafka, really) currently > doesn't have a metadata repository. Without a metadata repository, we have > no way of knowing 1) which key a topic is partitioned by, and 2) what the > schema of the topic is (without looking at records). We know this *within* > a query (or session), but not *between* queries (or sessions) from disjoint > users. > > One could argue that we should spend time defining such a metadata > repository, which is a reasonable argument. But that's also a fairly large > effort. I wonder if we might be able to cut some corners in a clean-ish > forwards-compatible way, so that developers don't have to wait for us to > fully implement (or integrate with) something like a Hive metadata store. > > In person, one thing you mentioned, Julian, was using hints, rather than > stuff baked into the syntax. If that's stomach-able, we could support > partitioning through hints, until we have a full blown metadata store. > > Thoughts? > > Cheers, > Chris > > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net> > wrote: > >> >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote: >> > >> > I am wondering if I can get an average that's per 30 min window >> averages? >> > I.e. the following is the input events in a stream: >> > {10:01, ORCL, 10, 10} >> > {10:02, MSFT, 30, 30} >> > {10:03, ORCL, 100, 110} >> > {10:17, MSFT, 45, 75} >> > {10:59, ORCL, 20, 130} >> > {11:02, ORCL, 50, 50} >> > Can I get the non-overlapping window average from 10:00-10:29, and >> > 10:30-10:59, ... ? Could you give an example how to define that window >> > operation in your model? Note that in this use case, although I may >> have 1 >> > trading record per minute from the stream, I only generate 2 average >> > records from 10:00-11:00. >> >> That takes a bit of messy date arithmetic, made even more messy by the >> fact that you can't regard a SQL timestamp as a "milliseconds" value as one >> would in C or Java, nor can you divide it. Hence the trick of subtracting a >> constant timestamp, which yields an interval value, and then doing integer >> division on that interval to find the number of 30-minute periods since the >> epoch. >> >> select stream rowtime, ticker, amount, >> sum(amount) over ( >> order by rowtime >> partition by ticker, >> (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30) >> from StockTrades; >> >> If I were doing this kind of calculation often I'd define a UDF, or even >> that user-defined window SPI I mentioned earlier. >> >> > {quote} >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department >> VARCHAR(20)) >> > PARTITION BY HASHCODE (department); >> > {quote} >> > That's good! At least it resolves my question on: "which field is the >> > partition key". However, I still have the question on the number of >> > partitions. As I stated that when the system currently does not have a >> > "auto-scaling" feature, the number of partitions for a stream has to be >> > explicitly specified. Where do you suggest to put this information in >> w/o >> > breaking SQL syntax? >> >> I imagine you could start the system on Tuesday with 10 partitions per >> stream and restart it on Wednesday with 8 or 12? You wouldn't want to >> change the SQL, because that's in the application. But you could change the >> definition of the stream, either the DDL or by changing some other system >> configuration. Then partitioning function (applied by the system to route >> the record) could, say, take the value of p modulo of the current number of >> streams. >> >> Julian >> >> >