http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_pb2.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/grpc/python/ipython_pb2.py b/python/src/main/resources/grpc/python/ipython_pb2.py new file mode 100644 index 0000000..eca3dfe --- /dev/null +++ b/python/src/main/resources/grpc/python/ipython_pb2.py @@ -0,0 +1,751 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: ipython.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='ipython.proto', + package='ipython', + syntax='proto3', + serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 \x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*!\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\t\n\x05IMAGE\x10\x01\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x1 8.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3') +) + +_EXECUTESTATUS = _descriptor.EnumDescriptor( + name='ExecuteStatus', + full_name='ipython.ExecuteStatus', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='SUCCESS', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ERROR', index=1, number=1, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=399, + serialized_end=438, +) +_sym_db.RegisterEnumDescriptor(_EXECUTESTATUS) + +ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS) +_IPYTHONSTATUS = _descriptor.EnumDescriptor( + name='IPythonStatus', + full_name='ipython.IPythonStatus', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='STARTING', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RUNNING', index=1, number=1, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=440, + serialized_end=482, +) +_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS) + +IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS) +_OUTPUTTYPE = _descriptor.EnumDescriptor( + name='OutputType', + full_name='ipython.OutputType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='TEXT', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='IMAGE', index=1, number=1, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=484, + serialized_end=517, +) +_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE) + +OutputType = enum_type_wrapper.EnumTypeWrapper(_OUTPUTTYPE) +SUCCESS = 0 +ERROR = 1 +STARTING = 0 +RUNNING = 1 +TEXT = 0 +IMAGE = 1 + + + +_EXECUTEREQUEST = _descriptor.Descriptor( + name='ExecuteRequest', + full_name='ipython.ExecuteRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='code', full_name='ipython.ExecuteRequest.code', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=26, + serialized_end=56, +) + + +_EXECUTERESPONSE = _descriptor.Descriptor( + name='ExecuteResponse', + full_name='ipython.ExecuteResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='status', full_name='ipython.ExecuteResponse.status', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='type', full_name='ipython.ExecuteResponse.type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='output', full_name='ipython.ExecuteResponse.output', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=58, + serialized_end=166, +) + + +_CANCELREQUEST = _descriptor.Descriptor( + name='CancelRequest', + full_name='ipython.CancelRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=168, + serialized_end=183, +) + + +_CANCELRESPONSE = _descriptor.Descriptor( + name='CancelResponse', + full_name='ipython.CancelResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=185, + serialized_end=201, +) + + +_COMPLETIONREQUEST = _descriptor.Descriptor( + name='CompletionRequest', + full_name='ipython.CompletionRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='code', full_name='ipython.CompletionRequest.code', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='cursor', full_name='ipython.CompletionRequest.cursor', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=203, + serialized_end=252, +) + + +_COMPLETIONRESPONSE = _descriptor.Descriptor( + name='CompletionResponse', + full_name='ipython.CompletionResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='matches', full_name='ipython.CompletionResponse.matches', index=0, + number=1, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=254, + serialized_end=291, +) + + +_STATUSREQUEST = _descriptor.Descriptor( + name='StatusRequest', + full_name='ipython.StatusRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=293, + serialized_end=308, +) + + +_STATUSRESPONSE = _descriptor.Descriptor( + name='StatusResponse', + full_name='ipython.StatusResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='status', full_name='ipython.StatusResponse.status', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=310, + serialized_end=366, +) + + +_STOPREQUEST = _descriptor.Descriptor( + name='StopRequest', + full_name='ipython.StopRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=368, + serialized_end=381, +) + + +_STOPRESPONSE = _descriptor.Descriptor( + name='StopResponse', + full_name='ipython.StopResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=383, + serialized_end=397, +) + +_EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS +_EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE +_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS +DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST +DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE +DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST +DESCRIPTOR.message_types_by_name['CancelResponse'] = _CANCELRESPONSE +DESCRIPTOR.message_types_by_name['CompletionRequest'] = _COMPLETIONREQUEST +DESCRIPTOR.message_types_by_name['CompletionResponse'] = _COMPLETIONRESPONSE +DESCRIPTOR.message_types_by_name['StatusRequest'] = _STATUSREQUEST +DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE +DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST +DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE +DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS +DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS +DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', (_message.Message,), dict( + DESCRIPTOR = _EXECUTEREQUEST, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.ExecuteRequest) + )) +_sym_db.RegisterMessage(ExecuteRequest) + +ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', (_message.Message,), dict( + DESCRIPTOR = _EXECUTERESPONSE, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.ExecuteResponse) + )) +_sym_db.RegisterMessage(ExecuteResponse) + +CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', (_message.Message,), dict( + DESCRIPTOR = _CANCELREQUEST, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.CancelRequest) + )) +_sym_db.RegisterMessage(CancelRequest) + +CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', (_message.Message,), dict( + DESCRIPTOR = _CANCELRESPONSE, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.CancelResponse) + )) +_sym_db.RegisterMessage(CancelResponse) + +CompletionRequest = _reflection.GeneratedProtocolMessageType('CompletionRequest', (_message.Message,), dict( + DESCRIPTOR = _COMPLETIONREQUEST, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.CompletionRequest) + )) +_sym_db.RegisterMessage(CompletionRequest) + +CompletionResponse = _reflection.GeneratedProtocolMessageType('CompletionResponse', (_message.Message,), dict( + DESCRIPTOR = _COMPLETIONRESPONSE, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.CompletionResponse) + )) +_sym_db.RegisterMessage(CompletionResponse) + +StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', (_message.Message,), dict( + DESCRIPTOR = _STATUSREQUEST, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.StatusRequest) + )) +_sym_db.RegisterMessage(StatusRequest) + +StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', (_message.Message,), dict( + DESCRIPTOR = _STATUSRESPONSE, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.StatusResponse) + )) +_sym_db.RegisterMessage(StatusResponse) + +StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', (_message.Message,), dict( + DESCRIPTOR = _STOPREQUEST, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.StopRequest) + )) +_sym_db.RegisterMessage(StopRequest) + +StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_message.Message,), dict( + DESCRIPTOR = _STOPRESPONSE, + __module__ = 'ipython_pb2' + # @@protoc_insertion_point(class_scope:ipython.StopResponse) + )) +_sym_db.RegisterMessage(StopResponse) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython')) +try: + # THESE ELEMENTS WILL BE DEPRECATED. + # Please use the generated *_pb2_grpc.py files instead. + import grpc + from grpc.beta import implementations as beta_implementations + from grpc.beta import interfaces as beta_interfaces + from grpc.framework.common import cardinality + from grpc.framework.interfaces.face import utilities as face_utilities + + + class IPythonStub(object): + """The IPython service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.execute = channel.unary_stream( + '/ipython.IPython/execute', + request_serializer=ExecuteRequest.SerializeToString, + response_deserializer=ExecuteResponse.FromString, + ) + self.complete = channel.unary_unary( + '/ipython.IPython/complete', + request_serializer=CompletionRequest.SerializeToString, + response_deserializer=CompletionResponse.FromString, + ) + self.cancel = channel.unary_unary( + '/ipython.IPython/cancel', + request_serializer=CancelRequest.SerializeToString, + response_deserializer=CancelResponse.FromString, + ) + self.status = channel.unary_unary( + '/ipython.IPython/status', + request_serializer=StatusRequest.SerializeToString, + response_deserializer=StatusResponse.FromString, + ) + self.stop = channel.unary_unary( + '/ipython.IPython/stop', + request_serializer=StopRequest.SerializeToString, + response_deserializer=StopResponse.FromString, + ) + + + class IPythonServicer(object): + """The IPython service definition. + """ + + def execute(self, request, context): + """Sends code + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def complete(self, request, context): + """Get completion + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def cancel(self, request, context): + """Cancel the running statement + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def status(self, request, context): + """Get ipython kernel status + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def stop(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + + def add_IPythonServicer_to_server(servicer, server): + rpc_method_handlers = { + 'execute': grpc.unary_stream_rpc_method_handler( + servicer.execute, + request_deserializer=ExecuteRequest.FromString, + response_serializer=ExecuteResponse.SerializeToString, + ), + 'complete': grpc.unary_unary_rpc_method_handler( + servicer.complete, + request_deserializer=CompletionRequest.FromString, + response_serializer=CompletionResponse.SerializeToString, + ), + 'cancel': grpc.unary_unary_rpc_method_handler( + servicer.cancel, + request_deserializer=CancelRequest.FromString, + response_serializer=CancelResponse.SerializeToString, + ), + 'status': grpc.unary_unary_rpc_method_handler( + servicer.status, + request_deserializer=StatusRequest.FromString, + response_serializer=StatusResponse.SerializeToString, + ), + 'stop': grpc.unary_unary_rpc_method_handler( + servicer.stop, + request_deserializer=StopRequest.FromString, + response_serializer=StopResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'ipython.IPython', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + class BetaIPythonServicer(object): + """The Beta API is deprecated for 0.15.0 and later. + + It is recommended to use the GA API (classes and functions in this + file not marked beta) for all further purposes. This class was generated + only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" + """The IPython service definition. + """ + def execute(self, request, context): + """Sends code + """ + context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) + def complete(self, request, context): + """Get completion + """ + context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) + def cancel(self, request, context): + """Cancel the running statement + """ + context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) + def status(self, request, context): + """Get ipython kernel status + """ + context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) + def stop(self, request, context): + # missing associated documentation comment in .proto file + pass + context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) + + + class BetaIPythonStub(object): + """The Beta API is deprecated for 0.15.0 and later. + + It is recommended to use the GA API (classes and functions in this + file not marked beta) for all further purposes. This class was generated + only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" + """The IPython service definition. + """ + def execute(self, request, timeout, metadata=None, with_call=False, protocol_options=None): + """Sends code + """ + raise NotImplementedError() + def complete(self, request, timeout, metadata=None, with_call=False, protocol_options=None): + """Get completion + """ + raise NotImplementedError() + complete.future = None + def cancel(self, request, timeout, metadata=None, with_call=False, protocol_options=None): + """Cancel the running statement + """ + raise NotImplementedError() + cancel.future = None + def status(self, request, timeout, metadata=None, with_call=False, protocol_options=None): + """Get ipython kernel status + """ + raise NotImplementedError() + status.future = None + def stop(self, request, timeout, metadata=None, with_call=False, protocol_options=None): + # missing associated documentation comment in .proto file + pass + raise NotImplementedError() + stop.future = None + + + def beta_create_IPython_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): + """The Beta API is deprecated for 0.15.0 and later. + + It is recommended to use the GA API (classes and functions in this + file not marked beta) for all further purposes. This function was + generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" + request_deserializers = { + ('ipython.IPython', 'cancel'): CancelRequest.FromString, + ('ipython.IPython', 'complete'): CompletionRequest.FromString, + ('ipython.IPython', 'execute'): ExecuteRequest.FromString, + ('ipython.IPython', 'status'): StatusRequest.FromString, + ('ipython.IPython', 'stop'): StopRequest.FromString, + } + response_serializers = { + ('ipython.IPython', 'cancel'): CancelResponse.SerializeToString, + ('ipython.IPython', 'complete'): CompletionResponse.SerializeToString, + ('ipython.IPython', 'execute'): ExecuteResponse.SerializeToString, + ('ipython.IPython', 'status'): StatusResponse.SerializeToString, + ('ipython.IPython', 'stop'): StopResponse.SerializeToString, + } + method_implementations = { + ('ipython.IPython', 'cancel'): face_utilities.unary_unary_inline(servicer.cancel), + ('ipython.IPython', 'complete'): face_utilities.unary_unary_inline(servicer.complete), + ('ipython.IPython', 'execute'): face_utilities.unary_stream_inline(servicer.execute), + ('ipython.IPython', 'status'): face_utilities.unary_unary_inline(servicer.status), + ('ipython.IPython', 'stop'): face_utilities.unary_unary_inline(servicer.stop), + } + server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout) + return beta_implementations.server(method_implementations, options=server_options) + + + def beta_create_IPython_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): + """The Beta API is deprecated for 0.15.0 and later. + + It is recommended to use the GA API (classes and functions in this + file not marked beta) for all further purposes. This function was + generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" + request_serializers = { + ('ipython.IPython', 'cancel'): CancelRequest.SerializeToString, + ('ipython.IPython', 'complete'): CompletionRequest.SerializeToString, + ('ipython.IPython', 'execute'): ExecuteRequest.SerializeToString, + ('ipython.IPython', 'status'): StatusRequest.SerializeToString, + ('ipython.IPython', 'stop'): StopRequest.SerializeToString, + } + response_deserializers = { + ('ipython.IPython', 'cancel'): CancelResponse.FromString, + ('ipython.IPython', 'complete'): CompletionResponse.FromString, + ('ipython.IPython', 'execute'): ExecuteResponse.FromString, + ('ipython.IPython', 'status'): StatusResponse.FromString, + ('ipython.IPython', 'stop'): StopResponse.FromString, + } + cardinalities = { + 'cancel': cardinality.Cardinality.UNARY_UNARY, + 'complete': cardinality.Cardinality.UNARY_UNARY, + 'execute': cardinality.Cardinality.UNARY_STREAM, + 'status': cardinality.Cardinality.UNARY_UNARY, + 'stop': cardinality.Cardinality.UNARY_UNARY, + } + stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size) + return beta_implementations.dynamic_stub(channel, 'ipython.IPython', cardinalities, options=stub_options) +except ImportError: + pass +# @@protoc_insertion_point(module_scope)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_pb2_grpc.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/grpc/python/ipython_pb2_grpc.py b/python/src/main/resources/grpc/python/ipython_pb2_grpc.py new file mode 100644 index 0000000..a590319 --- /dev/null +++ b/python/src/main/resources/grpc/python/ipython_pb2_grpc.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import ipython_pb2 as ipython__pb2 + + +class IPythonStub(object): + """The IPython service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.execute = channel.unary_stream( + '/ipython.IPython/execute', + request_serializer=ipython__pb2.ExecuteRequest.SerializeToString, + response_deserializer=ipython__pb2.ExecuteResponse.FromString, + ) + self.complete = channel.unary_unary( + '/ipython.IPython/complete', + request_serializer=ipython__pb2.CompletionRequest.SerializeToString, + response_deserializer=ipython__pb2.CompletionResponse.FromString, + ) + self.cancel = channel.unary_unary( + '/ipython.IPython/cancel', + request_serializer=ipython__pb2.CancelRequest.SerializeToString, + response_deserializer=ipython__pb2.CancelResponse.FromString, + ) + self.status = channel.unary_unary( + '/ipython.IPython/status', + request_serializer=ipython__pb2.StatusRequest.SerializeToString, + response_deserializer=ipython__pb2.StatusResponse.FromString, + ) + self.stop = channel.unary_unary( + '/ipython.IPython/stop', + request_serializer=ipython__pb2.StopRequest.SerializeToString, + response_deserializer=ipython__pb2.StopResponse.FromString, + ) + + +class IPythonServicer(object): + """The IPython service definition. + """ + + def execute(self, request, context): + """Sends code + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def complete(self, request, context): + """Get completion + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def cancel(self, request, context): + """Cancel the running statement + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def status(self, request, context): + """Get ipython kernel status + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def stop(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_IPythonServicer_to_server(servicer, server): + rpc_method_handlers = { + 'execute': grpc.unary_stream_rpc_method_handler( + servicer.execute, + request_deserializer=ipython__pb2.ExecuteRequest.FromString, + response_serializer=ipython__pb2.ExecuteResponse.SerializeToString, + ), + 'complete': grpc.unary_unary_rpc_method_handler( + servicer.complete, + request_deserializer=ipython__pb2.CompletionRequest.FromString, + response_serializer=ipython__pb2.CompletionResponse.SerializeToString, + ), + 'cancel': grpc.unary_unary_rpc_method_handler( + servicer.cancel, + request_deserializer=ipython__pb2.CancelRequest.FromString, + response_serializer=ipython__pb2.CancelResponse.SerializeToString, + ), + 'status': grpc.unary_unary_rpc_method_handler( + servicer.status, + request_deserializer=ipython__pb2.StatusRequest.FromString, + response_serializer=ipython__pb2.StatusResponse.SerializeToString, + ), + 'stop': grpc.unary_unary_rpc_method_handler( + servicer.stop, + request_deserializer=ipython__pb2.StopRequest.FromString, + response_serializer=ipython__pb2.StopResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'ipython.IPython', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_server.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py new file mode 100644 index 0000000..1d92766 --- /dev/null +++ b/python/src/main/resources/grpc/python/ipython_server.py @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import jupyter_client +import sys +import threading +import time +from concurrent import futures + +import grpc +import ipython_pb2 +import ipython_pb2_grpc + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +is_py2 = sys.version[0] == '2' +if is_py2: + import Queue as queue +else: + import queue as queue + + +TIMEOUT = 30 + +class IPython(ipython_pb2_grpc.IPythonServicer): + + def __init__(self, server): + self._status = ipython_pb2.STARTING + self._server = server + + def start(self): + print("starting...") + sys.stdout.flush() + self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name='python') + self._status = ipython_pb2.RUNNING + + def execute(self, request, context): + print("execute code: " + request.code) + sys.stdout.flush() + stdout_queue = queue.Queue(maxsize = 10) + stderr_queue = queue.Queue(maxsize = 10) + image_queue = queue.Queue(maxsize = 5) + + def _output_hook(msg): + msg_type = msg['header']['msg_type'] + content = msg['content'] + if msg_type == 'stream': + stdout_queue.put(content['text']) + elif msg_type in ('display_data', 'execute_result'): + stdout_queue.put(content['data'].get('text/plain', '')) + if 'image/png' in content['data']: + image_queue.put(content['data']['image/png']) + elif msg_type == 'error': + stderr_queue.put('\n'.join(content['traceback'])) + + + payload_reply = [] + def execute_worker(): + reply = self._kc.execute_interactive(request.code, + output_hook=_output_hook, + timeout=TIMEOUT) + payload_reply.append(reply) + + t = threading.Thread(name="ConsumerThread", target=execute_worker) + t.start() + + while t.is_alive(): + while not stdout_queue.empty(): + output = stdout_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.TEXT, + output=output) + while not stderr_queue.empty(): + output = stderr_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, + type=ipython_pb2.TEXT, + output=output) + while not image_queue.empty(): + output = image_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.IMAGE, + output=output) + + while not stdout_queue.empty(): + output = stdout_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.TEXT, + output=output) + while not stderr_queue.empty(): + output = stderr_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, + type=ipython_pb2.TEXT, + output=output) + while not image_queue.empty(): + output = image_queue.get() + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.IMAGE, + output=output) + + if payload_reply: + result = [] + for payload in payload_reply[0]['content']['payload']: + if payload['data']['text/plain']: + result.append(payload['data']['text/plain']) + if result: + yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, + type=ipython_pb2.TEXT, + output='\n'.join(result)) + + def cancel(self, request, context): + self._km.interrupt_kernel() + return ipython_pb2.CancelResponse() + + def complete(self, request, context): + reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT) + return ipython_pb2.CompletionResponse(matches=reply['content']['matches']) + + def status(self, request, context): + return ipython_pb2.StatusResponse(status = self._status) + + def stop(self, request, context): + self._server.stop(0) + sys.exit(0) + + +def serve(port): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + ipython = IPython(server) + ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server) + server.add_insecure_port('[::]:' + port) + server.start() + ipython.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + +if __name__ == '__main__': + serve(sys.argv[1]) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/grpc/python/zeppelin_python.py b/python/src/main/resources/grpc/python/zeppelin_python.py new file mode 100644 index 0000000..0f5638f --- /dev/null +++ b/python/src/main/resources/grpc/python/zeppelin_python.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_gateway import java_import, JavaGateway, GatewayClient + +from io import BytesIO +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +class PyZeppelinContext(object): + """ A context impl that uses Py4j to communicate to JVM + """ + + def __init__(self, z): + self.z = z + self.paramOption = gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption + self.javaList = gateway.jvm.java.util.ArrayList + self.max_result = z.getMaxResult() + + def input(self, name, defaultValue=""): + return self.z.getGui().input(name, defaultValue) + + def select(self, name, options, defaultValue=""): + javaOptions = gateway.new_array(self.paramOption, len(options)) + i = 0 + for tuple in options: + javaOptions[i] = self.paramOption(tuple[0], tuple[1]) + i += 1 + return self.z.getGui().select(name, defaultValue, javaOptions) + + def checkbox(self, name, options, defaultChecked=[]): + javaOptions = gateway.new_array(self.paramOption, len(options)) + i = 0 + for tuple in options: + javaOptions[i] = self.paramOption(tuple[0], tuple[1]) + i += 1 + javaDefaultCheck = self.javaList() + for check in defaultChecked: + javaDefaultCheck.append(check) + return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions) + + def show(self, p, **kwargs): + if type(p).__name__ == "DataFrame": # does not play well with sub-classes + # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame` + # and so a dependency on pandas + self.show_dataframe(p, **kwargs) + elif hasattr(p, '__call__'): + p() #error reporting + + def show_dataframe(self, df, show_index=False, **kwargs): + """Pretty prints DF using Table Display System + """ + limit = len(df) > self.max_result + header_buf = StringIO("") + if show_index: + idx_name = str(df.index.name) if df.index.name is not None else "" + header_buf.write(idx_name + "\t") + header_buf.write(str(df.columns[0])) + for col in df.columns[1:]: + header_buf.write("\t") + header_buf.write(str(col)) + header_buf.write("\n") + + body_buf = StringIO("") + rows = df.head(self.max_result).values if limit else df.values + index = df.index.values + for idx, row in zip(index, rows): + if show_index: + body_buf.write("%html <strong>{}</strong>".format(idx)) + body_buf.write("\t") + body_buf.write(str(row[0])) + for cell in row[1:]: + body_buf.write("\t") + body_buf.write(str(cell)) + body_buf.write("\n") + body_buf.seek(0); header_buf.seek(0) + #TODO(bzz): fix it, so it shows red notice, as in Spark + print("%table " + header_buf.read() + body_buf.read()) # + + # ("\n<font color=red>Results are limited by {}.</font>" \ + # .format(self.max_result) if limit else "") + #) + body_buf.close(); header_buf.close() + + +# start JVM gateway +client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT}) +gateway = JavaGateway(client) +java_import(gateway.jvm, "org.apache.zeppelin.display.Input") +intp = gateway.entry_point +z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/python/src/main/resources/interpreter-setting.json b/python/src/main/resources/interpreter-setting.json index 3bc42b8..bc1a746 100644 --- a/python/src/main/resources/interpreter-setting.json +++ b/python/src/main/resources/interpreter-setting.json @@ -17,6 +17,29 @@ "defaultValue": "1000", "description": "Max number of dataframe rows to display.", "type": "number" + }, + "zeppelin.python.useIPython": { + "propertyName": "zeppelin.python.useIPython", + "defaultValue": true, + "description": "whether use IPython when it is available", + "type": "checkbox" + } + }, + "editor": { + "language": "python", + "editOnDblClick": false + } + }, + { + "group": "python", + "name": "ipython", + "className": "org.apache.zeppelin.python.IPythonInterpreter", + "properties": { + "zeppelin.ipython.launch.timeout": { + "propertyName": "zeppelin.ipython.launch.timeout", + "defaultValue": "30000", + "description": "time out for ipython launch", + "type": "number" } }, "editor": { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java new file mode 100644 index 0000000..137d622 --- /dev/null +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.python; + +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.display.ui.CheckBox; +import org.apache.zeppelin.display.ui.Select; +import org.apache.zeppelin.display.ui.TextBox; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; + + +public class IPythonInterpreterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreterTest.class); + private IPythonInterpreter interpreter; + + @Before + public void setUp() { + Properties properties = new Properties(); + interpreter = new IPythonInterpreter(properties); + InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class); + interpreter.setInterpreterGroup(mockInterpreterGroup); + interpreter.open(); + } + + @After + public void close() { + interpreter.close(); + } + + + @Test + public void testIPython() throws IOException, InterruptedException { + testInterpreter(interpreter); + } + + public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException { + // to make this test can run under both python2 and python3 + InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // single output without print + InterpreterContext context = getInterpreterContext(); + result = interpreter.interpret("'hello world'", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("'hello world'", interpreterResultMessages.get(0).getData()); + + // only the last statement is printed + context = getInterpreterContext(); + result = interpreter.interpret("'hello world'\n'hello world2'", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("'hello world2'", interpreterResultMessages.get(0).getData()); + + // single output + context = getInterpreterContext(); + result = interpreter.interpret("print('hello world')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("hello world\n", interpreterResultMessages.get(0).getData()); + + // multiple output + context = getInterpreterContext(); + result = interpreter.interpret("print('hello world')\nprint('hello world2')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("hello world\nhello world2\n", interpreterResultMessages.get(0).getData()); + + // assignment + context = getInterpreterContext(); + result = interpreter.interpret("abc=1",context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(0, interpreterResultMessages.size()); + + // if block + context = getInterpreterContext(); + result = interpreter.interpret("if abc > 0:\n\tprint('True')\nelse:\n\tprint('False')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("True\n", interpreterResultMessages.get(0).getData()); + + // for loop + context = getInterpreterContext(); + result = interpreter.interpret("for i in range(3):\n\tprint(i)", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("0\n1\n2\n", interpreterResultMessages.get(0).getData()); + + // syntax error + context = getInterpreterContext(); + result = interpreter.interpret("print(unknown)", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertTrue(interpreterResultMessages.get(0).getData().contains("name 'unknown' is not defined")); + + // raise runtime exception + context = getInterpreterContext(); + result = interpreter.interpret("1/0", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertTrue(interpreterResultMessages.get(0).getData().contains("ZeroDivisionError")); + + // ZEPPELIN-1133 + context = getInterpreterContext(); + result = interpreter.interpret("def greet(name):\n" + + " print('Hello', name)\n" + + "greet('Jack')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("Hello Jack\n",interpreterResultMessages.get(0).getData()); + + // ZEPPELIN-1114 + context = getInterpreterContext(); + result = interpreter.interpret("print('there is no Error: ok')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("there is no Error: ok\n", interpreterResultMessages.get(0).getData()); + + // completion + context = getInterpreterContext(); + List<InterpreterCompletion> completions = interpreter.completion("ab", 2, context); + assertEquals(2, completions.size()); + assertEquals("abc", completions.get(0).getValue()); + assertEquals("abs", completions.get(1).getValue()); + + context = getInterpreterContext(); + interpreter.interpret("import sys", context); + completions = interpreter.completion("sys.", 4, context); + assertFalse(completions.isEmpty()); + + context = getInterpreterContext(); + completions = interpreter.completion("sys.std", 7, context); + assertEquals(3, completions.size()); + assertEquals("sys.stderr", completions.get(0).getValue()); + assertEquals("sys.stdin", completions.get(1).getValue()); + assertEquals("sys.stdout", completions.get(2).getValue()); + + // there's no completion for 'a.' because it is not recognized by compiler for now. + context = getInterpreterContext(); + String st = "a='hello'\na."; + completions = interpreter.completion(st, st.length(), context); + assertEquals(0, completions.size()); + + // define `a` first + context = getInterpreterContext(); + st = "a='hello'"; + result = interpreter.interpret(st, context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(0, interpreterResultMessages.size()); + + // now we can get the completion for `a.` + context = getInterpreterContext(); + st = "a."; + completions = interpreter.completion(st, st.length(), context); + // it is different for python2 and python3 and may even different for different minor version + // so only verify it is larger than 20 + assertTrue(completions.size() > 20); + + context = getInterpreterContext(); + st = "a.co"; + completions = interpreter.completion(st, st.length(), context); + assertEquals(1, completions.size()); + assertEquals("a.count", completions.get(0).getValue()); + + // cursor is in the middle of code + context = getInterpreterContext(); + st = "a.co\b='hello"; + completions = interpreter.completion(st, 4, context); + assertEquals(1, completions.size()); + assertEquals("a.count", completions.get(0).getValue()); + + // ipython help + context = getInterpreterContext(); + result = interpreter.interpret("range?", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertTrue(interpreterResultMessages.get(0).getData().contains("range(stop)")); + + // timeit + context = getInterpreterContext(); + result = interpreter.interpret("%timeit range(100)", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertTrue(interpreterResultMessages.get(0).getData().contains("loops")); + + // cancel + final InterpreterContext context2 = getInterpreterContext(); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + interpreter.cancel(context2); + } + }.start(); + result = interpreter.interpret("import time\ntime.sleep(10)", context2); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + interpreterResultMessages = context2.out.getInterpreterResultMessages(); + assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt")); + + // matplotlib + context = getInterpreterContext(); + result = interpreter.interpret("%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + // the order of IMAGE and TEXT is not determined + // check there must be one IMAGE output + boolean hasImageOutput = false; + for (InterpreterResultMessage msg : interpreterResultMessages) { + if (msg.getType() == InterpreterResult.Type.IMG) { + hasImageOutput = true; + } + } + assertTrue("No Image Output", hasImageOutput); + + // bokeh + // bokeh initialization + context = getInterpreterContext(); + result = interpreter.interpret("from bokeh.io import output_notebook, show\n" + + "from bokeh.plotting import figure\n" + + "output_notebook(notebook_type='zeppelin')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(2, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType()); + assertTrue(interpreterResultMessages.get(0).getData().contains("Loading BokehJS")); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType()); + assertTrue(interpreterResultMessages.get(1).getData().contains("BokehJS is being loaded")); + + // bokeh plotting + context = getInterpreterContext(); + result = interpreter.interpret("from bokeh.plotting import figure, output_file, show\n" + + "x = [1, 2, 3, 4, 5]\n" + + "y = [6, 7, 2, 4, 5]\n" + + "p = figure(title=\"simple line example\", x_axis_label='x', y_axis_label='y')\n" + + "p.line(x, y, legend=\"Temp.\", line_width=2)\n" + + "show(p)", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType()); + // docs_json is the source data of plotting which bokeh would use to render the plotting. + assertTrue(interpreterResultMessages.get(0).getData().contains("docs_json")); + + // ggplot + context = getInterpreterContext(); + result = interpreter.interpret("from ggplot import *\n" + + "ggplot(diamonds, aes(x='price', fill='cut')) +\\\n" + + " geom_density(alpha=0.25) +\\\n" + + " facet_wrap(\"clarity\")", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + // the order of IMAGE and TEXT is not determined + // check there must be one IMAGE output + hasImageOutput = false; + for (InterpreterResultMessage msg : interpreterResultMessages) { + if (msg.getType() == InterpreterResult.Type.IMG) { + hasImageOutput = true; + } + } + assertTrue("No Image Output", hasImageOutput); + + // ZeppelinContext + + // TextBox + context = getInterpreterContext(); + result = interpreter.interpret("z.input(name='text_1', defaultValue='value_1')", context); + Thread.sleep(100); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertTrue(interpreterResultMessages.get(0).getData().contains("'value_1'")); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("text_1") instanceof TextBox); + TextBox textbox = (TextBox) context.getGui().getForms().get("text_1"); + assertEquals("text_1", textbox.getName()); + assertEquals("value_1", textbox.getDefaultValue()); + + // Select + context = getInterpreterContext(); + result = interpreter.interpret("z.select(name='select_1', options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("select_1") instanceof Select); + Select select = (Select) context.getGui().getForms().get("select_1"); + assertEquals("select_1", select.getName()); + assertEquals(2, select.getOptions().length); + assertEquals("name_1", select.getOptions()[0].getDisplayName()); + assertEquals("value_1", select.getOptions()[0].getValue()); + + // CheckBox + context = getInterpreterContext(); + result = interpreter.interpret("z.checkbox(name='checkbox_1', options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, context.getGui().getForms().size()); + assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox); + CheckBox checkbox = (CheckBox) context.getGui().getForms().get("checkbox_1"); + assertEquals("checkbox_1", checkbox.getName()); + assertEquals(2, checkbox.getOptions().length); + assertEquals("name_1", checkbox.getOptions()[0].getDisplayName()); + assertEquals("value_1", checkbox.getOptions()[0].getValue()); + + // Pandas DataFrame + context = getInterpreterContext(); + result = interpreter.interpret("import pandas as pd\ndf = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType()); + assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData()); + } + + private static InterpreterContext getInterpreterContext() { + return new InterpreterContext( + "noteId", + "paragraphId", + "replName", + "paragraphTitle", + "paragraphText", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + null, + null, + new InterpreterOutput(null)); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java index 8b48b24..d649e89 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java @@ -53,6 +53,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene Properties p = new Properties(); p.setProperty("zeppelin.python", "python"); p.setProperty("zeppelin.python.maxResult", "100"); + p.setProperty("zeppelin.python.useIPython", "false"); intpGroup = new InterpreterGroup(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java index f200a0a..9e918c0 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java @@ -73,9 +73,21 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener Properties p = new Properties(); p.setProperty("zeppelin.python", "python"); p.setProperty("zeppelin.python.maxResult", "100"); + p.setProperty("zeppelin.python.useIPython", "false"); intpGroup = new InterpreterGroup(); + out = new InterpreterOutput(this); + context = new InterpreterContext("note", "id", null, "title", "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList<InterpreterContextRunner>(), + out); + InterpreterContext.set(context); + python = new PythonInterpreter(p); python.setInterpreterGroup(intpGroup); python.open(); @@ -85,16 +97,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener intpGroup.put("note", Arrays.asList(python, sql)); - out = new InterpreterOutput(this); - context = new InterpreterContext("note", "id", null, "title", "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("id"), - new LinkedList<InterpreterContextRunner>(), - out); // to make sure python is running. InterpreterResult ret = python.interpret("\n", context); @@ -140,10 +143,10 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener ret = sql.interpret("select name, age from df2 where age < 40", context); //then - assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); - assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0); - assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0); + assertEquals(new String(out.getOutputAt(1).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, out.getOutputAt(1).getType()); + assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("moon\t33") > 0); + assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("park\t34") > 0); assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code()); } @@ -156,7 +159,6 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener //then assertNotNull("Interpreter returned 'null'", ret); assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code()); - assertTrue(out.toInterpreterResultMessage().size() == 0); } @Test @@ -176,10 +178,10 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener // then assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType()); - assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("index_name")); - assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan")); - assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7")); + assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, out.getOutputAt(1).getType()); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("index_name")); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("nan")); + assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("6.7")); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index 837626c..195935d 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -59,6 +59,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener { Properties p = new Properties(); p.setProperty(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON); p.setProperty(MAX_RESULT, "1000"); + p.setProperty("zeppelin.python.useIPython", "false"); return p; } @@ -85,6 +86,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener { new LocalResourcePool("id"), new LinkedList<InterpreterContextRunner>(), out); + InterpreterContext.set(context); pythonInterpreter.open(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties new file mode 100644 index 0000000..a8e2c44 --- /dev/null +++ b/python/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +#log4j.appender.stdout.layout.ConversionPattern= +#%5p [%t] (%F:%L) - %m%n +#%-4r [%t] %-5p %c %x - %m%n +# + +# Root logger option +log4j.rootLogger=INFO, stdout +log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG +log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index d35f973..36e4a6c 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -72,6 +72,32 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-python</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-python</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -95,12 +121,6 @@ <scope>provided</scope> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - <!-- Aether :: maven dependency resolution --> <dependency> <groupId>org.apache.maven</groupId> @@ -355,6 +375,12 @@ <exclude>**/SparkRInterpreterTest.java</exclude> <exclude>${pyspark.test.exclude}</exclude> </excludes> + <environmentVariables> + <!-- local pyspark execution needs PYTHONPATH otherwise python daemon in executor side will fail + e.g. sc.range(1,10).sum() + --> + <PYTHONPATH>../interpreter/spark/pyspark/pyspark.zip:../interpreter/spark/pyspark/py4j-${spark.py4j.version}-src.zip:../interpreter/lib/python</PYTHONPATH> + </environmentVariables> </configuration> </plugin> @@ -379,6 +405,19 @@ <resource>reference.conf</resource> </transformer> </transformers> + + <relocations> + <!-- shade guava and proto-buf, because it might conflict with those of spark --> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.zeppelin.com.google</shadedPattern> + </relocation> + <!-- shade netty, because it might conflict with that of spark--> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>org.apache.zeppelin.io.netty</shadedPattern> + </relocation> + </relocations> </configuration> <executions> <execution> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java new file mode 100644 index 0000000..f1b1435 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.python.IPythonInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * PySparkInterpreter which use IPython underlying. + */ +public class IPySparkInterpreter extends IPythonInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class); + + private SparkInterpreter sparkInterpreter; + + public IPySparkInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property)); + sparkInterpreter = getSparkInterpreter(); + SparkConf conf = sparkInterpreter.getSparkContext().getConf(); + String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") + + ":../interpreter/lib/python"; + setAdditionalPythonPath(additionalPythonPath); + setAdditionalPythonInitFile("python/zeppelin_ipyspark.py"); + super.open(); + } + + private SparkInterpreter getSparkInterpreter() { + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + spark = (SparkInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return spark; + } + + @Override + public void cancel(InterpreterContext context) { + super.cancel(context); + sparkInterpreter.cancel(context); + } + + @Override + public void close() { + super.close(); + if (sparkInterpreter != null) { + sparkInterpreter.close(); + } + } + + @Override + public int getProgress(InterpreterContext context) { + return sparkInterpreter.getProgress(context); + } + + public boolean isSpark2() { + return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0); + } + + public JavaSparkContext getJavaSparkContext() { + return sparkInterpreter.getJavaSparkContext(); + } + + public Object getSQLContext() { + return sparkInterpreter.getSQLContext(); + } + + public Object getSparkSession() { + return sparkInterpreter.getSparkSession(); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 28910b2..e65df22 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -76,6 +76,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private static final int MAX_TIMEOUT_SEC = 10; private long pythonPid; + private IPySparkInterpreter iPySparkInterpreter; + public PySparkInterpreter(Properties property) { super(property); @@ -111,6 +113,37 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public void open() { + // try IPySparkInterpreter first + iPySparkInterpreter = getIPySparkInterpreter(); + if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") && + iPySparkInterpreter.checkIPythonPrerequisite()) { + try { + iPySparkInterpreter.open(); + if (InterpreterContext.get() != null) { + // don't print it when it is in testing, just for easy output check in test. + InterpreterContext.get().out.write(("IPython is available, " + + "use IPython for PySparkInterpreter\n") + .getBytes()); + } + LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter"); + return; + } catch (Exception e) { + LOGGER.warn("Fail to open IPySparkInterpreter", e); + } + } + iPySparkInterpreter = null; + + if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) { + // don't print it when it is in testing, just for easy output check in test. + try { + InterpreterContext.get().out.write(("IPython is not available, " + + "use the native PySparkInterpreter\n") + .getBytes()); + } catch (IOException e) { + LOGGER.warn("Fail to write InterpreterOutput", e); + } + } + // Add matplotlib display hook InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { @@ -190,9 +223,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH")); return env; } + // Run python shell + // Choose python in the order of + // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python + public static String getPythonExec(Properties properties) { + String pythonExec = properties.getProperty("zeppelin.pyspark.python", "python"); + if (System.getenv("PYSPARK_PYTHON") != null) { + pythonExec = System.getenv("PYSPARK_PYTHON"); + } + if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) { + pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON"); + } + return pythonExec; + } + private void createGatewayServerAndStartScript() { // create python script createPythonScript(); @@ -202,16 +250,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand gatewayServer = new GatewayServer(this, port); gatewayServer.start(); - // Run python shell - // Choose python in the order of - // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python - String pythonExec = getProperty("zeppelin.pyspark.python"); - if (System.getenv("PYSPARK_PYTHON") != null) { - pythonExec = System.getenv("PYSPARK_PYTHON"); - } - if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) { - pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON"); - } + String pythonExec = getPythonExec(property); CommandLine cmd = CommandLine.parse(pythonExec); cmd.addArgument(scriptPath, false); cmd.addArgument(Integer.toString(port), false); @@ -263,6 +302,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public void close() { + if (iPySparkInterpreter != null) { + iPySparkInterpreter.close(); + return; + } executor.getWatchdog().destroyProcess(); new File(scriptPath).delete(); gatewayServer.shutdown(); @@ -353,6 +396,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } + if (iPySparkInterpreter != null) { + return iPySparkInterpreter.interpret(st, context); + } + if (!pythonscriptRunning) { return new InterpreterResult(Code.ERROR, "python process not running" + outputStream.toString()); @@ -448,6 +495,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public void cancel(InterpreterContext context) { + if (iPySparkInterpreter != null) { + iPySparkInterpreter.cancel(context); + return; + } SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.cancel(context); try { @@ -464,6 +515,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public int getProgress(InterpreterContext context) { + if (iPySparkInterpreter != null) { + return iPySparkInterpreter.getProgress(context); + } SparkInterpreter sparkInterpreter = getSparkInterpreter(); return sparkInterpreter.getProgress(context); } @@ -472,6 +526,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext) { + if (iPySparkInterpreter != null) { + return iPySparkInterpreter.completion(buf, cursor, interpreterContext); + } if (buf.length() < cursor) { cursor = buf.length(); } @@ -588,6 +645,21 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return spark; } + private IPySparkInterpreter getIPySparkInterpreter() { + LazyOpenInterpreter lazy = null; + IPySparkInterpreter iPySpark = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + iPySpark = (IPySparkInterpreter) p; + return iPySpark; + } + public SparkZeppelinContext getZeppelinContext() { SparkInterpreter sparkIntp = getSparkInterpreter(); if (sparkIntp != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json index e96265f..d646805 100644 --- a/spark/src/main/resources/interpreter-setting.json +++ b/spark/src/main/resources/interpreter-setting.json @@ -149,11 +149,28 @@ "defaultValue": "python", "description": "Python command to run pyspark with", "type": "string" + }, + "zeppelin.spark.useIPython": { + "envName": null, + "propertyName": "zeppelin.spark.useIPython", + "defaultValue": true, + "description": "whether use IPython when it is available", + "type": "checkbox" } }, "editor": { "language": "python", "editOnDblClick": false } + }, + { + "group": "spark", + "name": "ipyspark", + "className": "org.apache.zeppelin.spark.IPySparkInterpreter", + "properties": {}, + "editor": { + "language": "python", + "editOnDblClick": false + } } ]