Steven Jacobs has posted comments on this change. Change subject: Added Channels to Asterix ......................................................................
Patch Set 7: (71 comments) I addressed most of the comments. I am having a little trouble understanding the way to do the locks correctly in the case of Create Channel. It may be that they just aren't ready to be done correctly. The thing to keep in mind is that this is intended as an incremental review. There is no documentation added because users shouldn't actually be using the channel feature yet. Once things are ready for users, there will be docs. This is also why there are some things that aren't bulletproof yet. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/pom.xml File asterix-active/pom.xml: Line 29: <artifactId>asterix-active-jobs</artifactId> > change to asterix-active just as the directory name. otherwise, this gets c Done Line 37: <dependency> > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/ActiveActivity.java File asterix-active/src/main/java/org/apache/asterix/active/ActiveActivity.java: Line 51: if (!(other instanceof FeedActivity)) { > this checks for instance of FeedActivity and then cast to ActiveActivity!! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobId.java File asterix-active/src/main/java/org/apache/asterix/active/ActiveJobId.java: Line 24: > Clearly this class is not needed since it is simply a wrapper for ActiveEnt I had big problems with this originally because in Feeds there are two separate concepts: FeedId and FeedConnectionId. FeedConnectionId could be looked at FeedId + dataset. But now I want to abstract the code while still separating the two. I need something for general active jobs that FeedConnectionId can simply be an instance of. Hence the creation of ActiveJobId. We don't want activeIds and activeConnectionIds to be treated equally in the abstracted code, even though they hold the same type of data they are two different things. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/api/IActiveMessage.java File asterix-active/src/main/java/org/apache/asterix/active/api/IActiveMessage.java: Line 24: > Note: Okay, when you are ready to remove it I can mimic your change as well. Right now I am only using a single instance so it should be a small change. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/channel/ChannelRuntime.java File asterix-active/src/main/java/org/apache/asterix/active/channel/ChannelRuntime.java: Line 33: > So this is a hyracks job which gets run based on a frequency? Actually this is only run on one NC. Right now this operator is given an absolute partition constraint of 1, so it only exists in one location. Eventually channels will be implemented as a fully active pipeline and this operator will go away. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/channel/StoredJobRuntime.java File asterix-active/src/main/java/org/apache/asterix/active/channel/StoredJobRuntime.java: Line 51: activeManager.getChannelJobService().runChannelJob(jobSpec); > this runChannelJob will communicate with the CC and run the job through hyr Since we use an absolute partitioning constraint of 1, this is only running in one location. Again, this operator will be removed eventually and replaces with a fully active pipeline https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMessageOperatorDescriptor.java File asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMessageOperatorDescriptor.java: Line 29: > I think this needs to go away. you can keep it for now, but I will move fee Okay, I will mimic your change when it happens. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMessageOperatorNodePushable.java File asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMessageOperatorNodePushable.java: Line 69: * > WS. Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMetaNodePushable.java File asterix-active/src/main/java/org/apache/asterix/active/operators/ActiveMetaNodePushable.java: Line 82: /** The (singleton) instance of IFeedManager **/ > comments don't reflect the new change. Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/operators/ChannelMetaOperatorDescriptor.java File asterix-active/src/main/java/org/apache/asterix/active/operators/ChannelMetaOperatorDescriptor.java: Line 6: * > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/active/operators/RepetitiveChannelOperatorNodePushable.java File asterix-active/src/main/java/org/apache/asterix/active/operators/RepetitiveChannelOperatorNodePushable.java: Line 88: > what is the writer here typically? Good point. I'll remove this and see if it still works right. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/external/feed/api/IActiveManager.java File asterix-active/src/main/java/org/apache/asterix/external/feed/api/IActiveManager.java: Line 32: * > WS! Done Line 40: * > WS! Done Line 48: * > WS! Done Line 56: * > WS! Done Line 64: * > WS! Done Line 72: * > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-active/src/main/java/org/apache/asterix/external/feed/api/IAdapterRuntimeManager.java File asterix-active/src/main/java/org/apache/asterix/external/feed/api/IAdapterRuntimeManager.java: Line 49: * > WS! Done Line 56: * > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/ReplaceableSinkOperator.java File asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/ReplaceableSinkOperator.java: Line 6: * > WS! Done Line 23: > This is only called from a single location where pipelineEnd is always set The sink operator is part of Hyracks, and having a sink that isn't really a sink doesn't make much sense in the Hyracks context. I am actually eliminating this operator altogether in my next Channels change (still in progress) though, so it probably doesn't matter either way. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java File asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java: Line 142: flushIfNotFailed(); > There is a theoretical chance that writer.close() is called without writer. I'm not sure where the issue is. isSink is final and AbstractOneInputOneOutputOneFramePushRuntime.close() can't be called until after AbstractOneInputOneOutputOneFramePushRuntime.open() has finished right? https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java File asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java: Line 68: } > if we follow the suggestion I proposed (removing replacableSinkOperator), t Please see comment there. This code is actually going to change a lot in my next Channel code in work now. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java File asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java: Line 298: private final boolean returnRecord; > Can you explain what are the options for the return values are? This only allows for options 1 and 2 3. insert () return records This returns the entirety of the records that were inserted 1. insert () returning fieldname This returns only the values for this field in the record I didn't account for the use-case for selecting a subset of fields. Please see the two tests I added in runtime for simple examples. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java File asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java: Line 383: insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); > instead of casting stmt multiple times? why not define another variable of Done Line 407: Mutable<ILogicalExpression> nameRef = new MutableObject<ILogicalExpression>( > this will not work for nested fields This is true. Right now this only allows for returning top-level fields. Line 424: > The last two lines of the if and else blocks can be refactored out? Done Line 434: IAType outputRecordType = metadataProvider.findOutputRecordType(); > I think this is not the right way to get the output record type. why not ge I copied this from line 305. I'm not sure exactly what you mean. The insert statement only has the name of the dataset. https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveJobNotificationHandler.java File asterix-app/src/main/java/org/apache/asterix/app/external/ActiveJobNotificationHandler.java: Line 25: import java.util.HashMap; > This class is becoming painful to look at. Or it has been that way since th Agree :) Maybe also the LifecycleListener https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java File asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java: Line 277: private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds; > WSs! I'll fix the whitespace, but this does bring up the question of why this code is here if not used. Do you have a future use for it? Line 383: public void run() { > WSs! I'll fix the whitespace, but this does bring up the question of why this code is here if not used. Do you have a future use for it? Line 426: > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveWorkRequestResponseHandler.java File asterix-app/src/main/java/org/apache/asterix/app/external/ActiveWorkRequestResponseHandler.java: Line 157: } > Can you please add a TODO to get rid of all the thread.sleep calls. I think Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/ChannelJobService.java File asterix-app/src/main/java/org/apache/asterix/app/external/ChannelJobService.java: Line 74: } > please, ensure Metadata transactions are always either committed or aborted Done. I think the test will be unnecessary, as this transaction is also eliminated in the next Channel code. Line 95: URL url = new URL(targetURL); > Why do we go to brokers through http? Brokers don't "exist" on the cluster. The broker endpoint is actually the only way to communicate with them. Line 108: wr.writeBytes(urlParameters); > put wr in a try with clause to ensure its closure. Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/ChannelOperations.java File asterix-app/src/main/java/org/apache/asterix/app/external/ChannelOperations.java: Line 6: * > WS! Done Line 43: * > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java File asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java: Line 67: * > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java File asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java: Line 2532: boolean metaDataLock = true; > metaDataLock variable is not needed since it will always be true Done Line 2572: > Why not get rid of the broker completely since the way I am seeing it now, The broker is simply a metadata entity that contains an enPoint variable. The endPoint variable is a communication link to the actual broker, which exists outside of Asterix Line 2603: channel = MetadataManager.INSTANCE.getChannel(mdTxnCtx, dataverseName, channelName); > Appropriate locks must be acquired before accessing metadata entities. I ca I'm not quite sure of how to make this work. Maybe we can discuss? Line 2624: fieldNames.add("subscriptionId"); > make all BAD strings constants and have them in a common place? Done Line 2641: //Run both statements together to create datasets > One of the extremely bad parts in feed which we have agreed that I have to I'm not sure what such a better way would be. Line 2671: JobSpecification ChanneljobSpec = handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, > You're creating a hyracks job that might become invalid when you run it at The reason it is done this way is to prevent the tasks of parsing, compiling, and rewriting occurring every time the job is run. We actually have this same issue in Feeds, in the sense that a change to the datasets/indexes that a feed is using will break the feed. Raman's solution to this is to disallow changes to datasets that are being fed. In any case, this is once again intended as an incremental review, not intended for people to use as is (hence no documentation) and this part of the code is actually needing to change drastically before the user version is released. Line 2718: try { > what prevents channel datasets from being dropped using a regular drop data This is on my todo currently for Channels. Right now there is the potential for just that to occur. This code review doesn't actually provide Channels for use (hence no documentation) but is rather the "incremental" code review that we've been trying for. Line 2726: > starting a new transaction before completing the already began transaction? Done Line 2729: MetadataLockManager.INSTANCE.dropChannelBegin(dataverseName, dataverseName + "." + channelName); > why locking twice on the channel? Done Line 2745: DropStoredJobOperatorMessage dropChannelMessage = new DropStoredJobOperatorMessage(channelId, > channel is dropped after metadata changes. this means that the channel job Done Line 2750: } > what if the job fails? what do we do then? I refactored all of this. Take a look at the new version and see what you think Line 2803: > put commit transaction in a finally clause! I redid these two mimic compactEnd. Let me know if there is still an issue Line 2849: > use try-catch-finally for metadata transactions Done Line 2929: boolean isChannel) throws Exception { > isChannel is always false! Legacy code :) Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/optimizerts/queries/channels/channel-create.aql File asterix-app/src/test/resources/optimizerts/queries/channels/channel-create.aql: Line 2: * Description : Check the Plan used by a channel > WSs! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/optimizerts/queries/channels/channel-subscribe.aql File asterix-app/src/test/resources/optimizerts/queries/channels/channel-subscribe.aql: Line 3: * Expected Res : Success > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/optimizerts/queries/channels/channel-unsubscribe.aql File asterix-app/src/test/resources/optimizerts/queries/channels/channel-unsubscribe.aql: Line 5: */ > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql File asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql: Line 31: return $tweet.message-text > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_datasets/create_channel_check_datasets.3.query.aql File asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_datasets/create_channel_check_datasets.3.query.aql: Line 4: for $x in dataset Metadata.Dataset > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql File asterix-app/src/test/resources/runtimets/queries/channels/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql: Line 31: return $tweet.message-text > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql File asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql: Line 31: return $tweet.message-text > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql File asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql: Line 4: for $x in dataset Metadata.Dataset > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql File asterix-app/src/test/resources/runtimets/queries/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql: Line 31: return $tweet.message-text > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/channels/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql File asterix-app/src/test/resources/runtimets/queries/channels/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql: Line 31: return $tweet.message-text > WS! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.2.update.aql File asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.2.update.aql: Line 4: * Expected Result : Success > File not needed! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.2.update.aql File asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.2.update.aql: Line 6: */ > File not needed! Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-lang-aql/src/main/javacc/AQL.jj File asterix-lang-aql/src/main/javacc/AQL.jj: Line 727: ( > this is not good because there has to be exactly a single space between rep Done Line 1179: Pair<Identifier,Identifier> nameComponents = null; > WS Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java File asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java: Line 61: * > WSs! All done! https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Broker.java File asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Broker.java: Line 5: * you may obtain a copy of the License from > WS Done https://asterix-gerrit.ics.uci.edu/#/c/693/7/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Channel.java File asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Channel.java: Line 6: * > WS Done -- To view, visit https://asterix-gerrit.ics.uci.edu/693 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I1ba31141e9e89e626a3fbe560e08b73e459af16a Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]> Gerrit-HasComments: Yes
