I'm suggesting that $recordIdentifier be a new virtual field similar to dir0. I think that should be the case no matter what approach we take. On Dec 13, 2015 10:52 PM, "Hsuan Yi Chu" <[email protected]> wrote:
> I think I missed one thing in the second solution. > > Drill needs to report the locations of the source of skipped records, which > are known only at Scan. It seems hop-by-hop is needed to carry that > information. > > On Sun, Dec 13, 2015 at 4:36 PM, Jacques Nadeau <[email protected]> > wrote: > > > If your goal is early termination, sending the messages back as quickly > as > > possible to the Screen or similar centralized operator will allow you to > > respond quickly. Remember that there will likely be many fragments > > executing in parallel. > > > > -- > > Jacques Nadeau > > CTO and Co-Founder, Dremio > > > > On Sun, Dec 13, 2015 at 2:46 PM, Hsuan Yi Chu <[email protected]> > wrote: > > > > > On Sun, Dec 13, 2015 at 9:15 AM, Jacques Nadeau <[email protected]> > > > wrote: > > > > > > > You seem to be mixing multiple things in your response. > > > > > > > > - Why do you say this complex? It is very simple. Is it because you > > don't > > > > know how it would be implemented? I'm offering to do the vast > majority > > of > > > > the work to implement the framework so you shouldn't use that as a > > gauge. > > > > - It is designed to provide for multiple different use cases, not > just > > > your > > > > own. As such, you should expect it to be more general. There is > > clearly a > > > > need to provide these messages in direction other than straight up > the > > > > operator tree. There is also a need to provide sideband messages > > outside > > > > the context of a record batch. (We shouldn't be creating fake empty > > > record > > > > batches just to send sideband messages, that caused us problems > before > > on > > > > the UserRpc and I think we should compound purposes.) > > > > - You should evaluate whether it would solve the use case you > > presented. > > > I > > > > believe it will. > > > > > > > > As far as your proposed implementation goes: I think you are > > confounding > > > > communication with the user with traversal of the operator tree. I > > would > > > > assume that each operator may be able to skip records. When you > > > accumulate > > > > that information, you would want to know how much skip there were for > > > each > > > > operator. The info might look like: > > > > > > > > skips: [ > > > > { op: 1:1:1, records: [123,456,789]} > > > > { op: 1:2:1, records: [123,456,789]} > > > > { op: 1:1:2, records: [123,456,789]} > > > > { op: 1:2:2, records: [123,456,789]} > > > > ] > > > > > > > > In this case, there is no need for operator 1:1:1 to know about > > operator > > > > 1:1:2's skips. It shouldn't even need to manage or move that data. > So I > > > > believe your requirements are actually to provide a stream of skip > > > records > > > > to a separate writer that should be on the edge of the plan. The > more I > > > > talk through this, I'm wondering if sideband messages should take the > > > same > > > > shape as a separate record batch and that we need to provide a > separate > > > > subtree/fragment for this purpose. Sideband in that case would be a > tee > > > in > > > > the plan. > > > > > > > > > > For the case of skipping records, we will have a threshold, which > defines > > > the bound on # of skipped records before Drill fails the query. Thus, > if > > > operator 1:1:1 can be informed of how many records have been skipped in > > the > > > upstream operators, we could fail the query earlier. > > > > > > Given this in mind, we could have two solutions to fail the query > > earlier: > > > 1. Let the sideband message hop from upstream to downstream. On the > way, > > > each operator determines to fail the query if the threshold is > exceeded. > > > > > > 2. While each operator does work independently, the sideband sink > > operator > > > would be the one and the only one which has the knowledge of how many # > > > have been skipped. Once the threshold is exceeded, this sink operator > > will > > > be responsible to stop the query (via another sideband message to > inform > > > the foreman). > > > > > > When I read the proposal, I was thinking about the first solution. > > > Certainly, the second one seems leveraging sideband better. > > > > > > For example, imagine this tree: > > > > > > > > > > > > > > > > > > https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N05uU/edit > > > > > > > > As you can see, I believe that the vast majority of the issues that > you > > > > want to manage with your skip record design can be managed by > > providing a > > > > couple of simple tools: sideband, sideband sink operator (basically a > > > > custom version of the union receiver), and an enhancement to the > Screen > > > > operator to support a secondary incoming stream with a defined schema > > > that > > > > will be transformed into a set of warnings (this also allows fine > > grained > > > > warnings or use an aggregate in the secondary tree for aggregate > > > warnings). > > > > > > > > The key goal here is trying to avoid the introduction of a new or > more > > > > complicated interfaces at the execution layer and instead use the > > logical > > > > layer to manage things. I believe this also extends to the concept of > > > > $recordIdentifier (or similar). This should simply be a virtual field > > > > produced by all record readers (when requested) that includes > relevant > > > > provenance information. If you want to know which records are > > > problematic, > > > > ask for the identifier and then record in a separate file. Basically, > > > let's > > > > use the highly efficient infrastructure we already have to do new > > things > > > > rather than implementing a new set of classes and concepts. > > > > > > > > > > > > -- > > > > Jacques Nadeau > > > > CTO and Co-Founder, Dremio > > > > > > > > On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu <[email protected]> > > > wrote: > > > > > > > > > The design scope is very general, but, for the applications we are > > > > thinking > > > > > about now, this is a bit complex and will make the solutions a > little > > > bit > > > > > indirect. Especially, this one "data to be sent between any two > > > > > three-coordinate locations" implies sideband data goes in teleport? > > > This > > > > is > > > > > a bit too involving. And even for advanced pushdown, it is not > > > necessary > > > > to > > > > > be that flexible for communications. > > > > > > > > > > My original picture of "sideband" is that the additional > information > > > > should > > > > > be "associated with" RecordBatch. That means this additional > > > information > > > > > should be attached to a particular RecordBatch and cannot run on > > their > > > > own. > > > > > > > > > > As the RecordBatch flows from upstream to downstream, the operator > > can > > > > > optionally access or update the sideband message. > > > > > For example, in the application of record-skipping, operator can > see > > > how > > > > > many records were skipped so far and increment the count if more > are > > > > > skipped. > > > > > > > > > > If we go with this design, the place we need to change is on the > > > receiver > > > > > side, which needs to decode the sideband info from the incoming > > > buffers. > > > > > > > > > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau <[email protected] > > > > > > wrote: > > > > > > > > > > > inline > > > > > > > > > > > > It seems that SidebandTunnel is point-to-point. That is, there is > > one > > > > > > > producer and one consumer. No broadcast or topics (multiple > > > consumers > > > > > of > > > > > > > the same message). Order is preserved. At-most-once (i.e. may > > lose > > > > data > > > > > > in > > > > > > > event of failure). Producer and consumer may be on the same > node > > or > > > > > > > different nodes. Correct? > > > > > > > > > > > > > > > > > > > Yes, you are correct in all of this. Since we don't use UDP in > > Drill, > > > > we > > > > > do > > > > > > broadcast as a collection of individual p2p calls, all using the > > same > > > > > > message (and multiple reference counts if using raw bytes). > > > > > > > > > > > > > > > > > > > > > > > > > > I’m not sure SidebandTunnel.close is necessary. I would presume > > > that > > > > a > > > > > > > SidebandTunnel is closed when its associated statement is > closed, > > > and > > > > > > only > > > > > > > then. > > > > > > > > > > > > > > > > > > > I started without it. My thought was that we may need to signal > > that > > > > > you've > > > > > > gotten all of a sideband stream prior to the close of a > particular > > > > > > fragment. If I'm on the downside of an operation reporting > multiple > > > > > skips, > > > > > > I may want to hold off on reporting to the user until I got all > of > > > the > > > > > > messages. One option is for the sender to send a discrete message > > via > > > > the > > > > > > Tunnel close. The other option is a implicit message when the > > > fragment > > > > is > > > > > > completed. I like the latter from a cleanliness perspective but > > think > > > > the > > > > > > former may be required. I'm ok for not exposing at the tunnel > level > > > > > > publically initially and we can always expose later. I would love > > to > > > > hear > > > > > > whether people think there is going to be a need/use case to > > continue > > > > > > fragment operation but have another operator know that a sideband > > > > stream > > > > > is > > > > > > complete. Maybe when sending a downstream set of samples on the > > first > > > > 1mm > > > > > > records of a larger scan? > > > > > > > > > > > > > > > > > > > Also, would it be easier if the tunnels were defined as part of > > the > > > > > DAG, > > > > > > > and DAG initialization time was the only time that they could > be > > > > > created? > > > > > > > > > > > > > > > > > > > That is a really good question. I need to think about it a bit. > I'm > > > not > > > > > > sure it is easier given my initial proposal is to piggy-back on > the > > > > > > DataTunnel, (which is independent of DAG initialization). > However, > > > it > > > > > > might be cleaner if operators have to declare this relationship > at > > > > > > initialization time and it is all managed 'outside'. > > > > > > > > > > > > Thanks for the feedback. Will need to think further on your last > > > point > > > > > > especially. > > > > > > > > > > > > > > > > > > > > > > > > > > Julian > > > > > > > > > > > > > > > > > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau < > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > > > > Please see some initial thoughts attached. Would love > feedback > > > and > > > > > > > thoughts > > > > > > > > from others on how we can shape this. > > > > > > > > > > > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99 > > > > > > > > > > > > > > > > -- > > > > > > > > Jacques Nadeau > > > > > > > > CTO and Co-Founder, Dremio > > > > > > > > > > > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Yes, it would be great to get your thoughts so we can assess > > the > > > > > scope > > > > > > > of > > > > > > > >> what's involved. > > > > > > > >> > > > > > > > >> Thanks. > > > > > > > >> > > > > > > > >> -- Zelaine > > > > > > > >> > > > > > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau < > > > > [email protected]> > > > > > > > wrote: > > > > > > > >> > > > > > > > >>> Definitely agree that we shouldn't boil the ocean. That > > said, > > > I > > > > > > don't > > > > > > > >>> think we should make RecordBatch interface changes without > > > > > deliberate > > > > > > > >>> design. Same for RPC protocol changes. Part of my internal > > > > struggle > > > > > > > with > > > > > > > >>> the warning patch is exactly this lack of broader design. I > > > think > > > > > > this > > > > > > > is > > > > > > > >>> especially true given the drive to supports backwards > > > > > compatibility. > > > > > > > >>> > > > > > > > >>> I don't think we're talking about a massive undertaking. > I'll > > > try > > > > > to > > > > > > > >> write > > > > > > > >>> up some thoughts later this week to get the ball rolling. > > Sound > > > > > good? > > > > > > > >>> > > > > > > > >>> -- > > > > > > > >>> Jacques Nadeau > > > > > > > >>> CTO and Co-Founder, Dremio > > > > > > > >>> +1 on having a framework. > > > > > > > >>> OTOH, as with the warnings implementation, we might want to > > go > > > > > ahead > > > > > > > >> with a > > > > > > > >>> simpler implementation while we get a more generic > framework > > > > design > > > > > > in > > > > > > > >>> place. > > > > > > > >>> > > > > > > > >>> Jacques, do you have any preliminary thoughts on the > > framework? > > > > > > > >>> > > > > > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian Hyde < > > [email protected]> > > > > > > wrote: > > > > > > > >>> > > > > > > > >>>> +1 for a sideband mechanism. > > > > > > > >>>> > > > > > > > >>>> Sideband can also allow correlated restart of sub-queries. > > > > > > > >>>> > > > > > > > >>>> In sideband use cases you described, the messages ran in > the > > > > > > opposite > > > > > > > >>>> direction to the data. Would the sideband also run in the > > same > > > > > > > >> direction > > > > > > > >>> as > > > > > > > >>>> the data? If so it could carry warnings, rejected rows, > > > progress > > > > > > > >>>> indications, and (for online aggregation[1]) notifications > > > that > > > > a > > > > > > > >> better > > > > > > > >>>> approximate query result is available. > > > > > > > >>>> > > > > > > > >>>> Julian > > > > > > > >>>> > > > > > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques Nadeau < > > > [email protected] > > > > > > > > > > > > >> wrote: > > > > > > > >>>>> > > > > > > > >>>>> This seems like a form of sideband communication. I think > > we > > > > > should > > > > > > > >>> have > > > > > > > >>>> a > > > > > > > >>>>> framework for this type of thing in general rather than a > > > > one-off > > > > > > for > > > > > > > >>>> this > > > > > > > >>>>> particular need. Other forms of sideband might be small > > table > > > > > > > >>> bloomfilter > > > > > > > >>>>> generation and pushdown into hbase, separate file > > > > > > > >>> assignment/partitioning > > > > > > > >>>>> providers balancing/generating scanner workloads, > > statistics > > > > > > > >> generation > > > > > > > >>>> for > > > > > > > >>>>> adaptive execution, etc. > > > > > > > >>>>> > > > > > > > >>>>> -- > > > > > > > >>>>> Jacques Nadeau > > > > > > > >>>>> CTO and Co-Founder, Dremio > > > > > > > >>>>> > > > > > > > >>>>> On Tue, Dec 1, 2015 at 11:35 AM, Hsuan Yi Chu < > > > > > [email protected] > > > > > > > > > > > > > > >>>> wrote: > > > > > > > >>>>> > > > > > > > >>>>>> I am trying to deal with the following scenario: > > > > > > > >>>>>> > > > > > > > >>>>>> A bunch of minor fragments are doing things in parallel. > > > Each > > > > of > > > > > > > >> them > > > > > > > >>>> could > > > > > > > >>>>>> skip some records. Since the downstream minor fragment > > needs > > > > to > > > > > > know > > > > > > > >>> the > > > > > > > >>>>>> sum of skipped-record-counts (in order to just display > or > > > see > > > > if > > > > > > the > > > > > > > >>>> number > > > > > > > >>>>>> exceeds the threshold) in the upstreams, each upstream > > minor > > > > > > > >> fragment > > > > > > > >>>> needs > > > > > > > >>>>>> to pass this scalar with RecordBatch. > > > > > > > >>>>>> > > > > > > > >>>>>> Since this seems impacting the protocol of RecordBatch, > I > > am > > > > > > looking > > > > > > > >>> for > > > > > > > >>>>>> some advice here. > > > > > > > >>>>>> > > > > > > > >>>>>> Thanks. > > > > > > > >>>>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
