http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_pb2.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/grpc/python/ipython_pb2.py 
b/python/src/main/resources/grpc/python/ipython_pb2.py
new file mode 100644
index 0000000..eca3dfe
--- /dev/null
+++ b/python/src/main/resources/grpc/python/ipython_pb2.py
@@ -0,0 +1,751 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: ipython.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf.internal import enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='ipython.proto',
+  package='ipython',
+  syntax='proto3',
+  
serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01
 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 
\x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 
\x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 
\x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01
 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 
\x01(\x05\"%\n\x12\x43ompletionResponse\x12\x0f\n\x07matches\x18\x01 
\x03(\t\"\x0f\n\rStatusRequest\"8\n\x0eStatusResponse\x12&\n\x06status\x18\x01 
\x01(\x0e\x32\x16.ipython.IPythonStatus\"\r\n\x0bStopRequest\"\x0e\n\x0cStopResponse*\'\n\rExecuteStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01**\n\rIPythonStatus\x12\x0c\n\x08STARTING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01*!\n\nOutputType\x12\x08\n\x04TEXT\x10\x00\x12\t\n\x05IMAGE\x10\x01\x32\xc3\x02\n\x07IPython\x12@\n\x07\x65xecute\x12\x17.ipython.ExecuteRequest\x1a\x1
 
8.ipython.ExecuteResponse\"\x00\x30\x01\x12\x45\n\x08\x63omplete\x12\x1a.ipython.CompletionRequest\x1a\x1b.ipython.CompletionResponse\"\x00\x12;\n\x06\x63\x61ncel\x12\x16.ipython.CancelRequest\x1a\x17.ipython.CancelResponse\"\x00\x12;\n\x06status\x12\x16.ipython.StatusRequest\x1a\x17.ipython.StatusResponse\"\x00\x12\x35\n\x04stop\x12\x14.ipython.StopRequest\x1a\x15.ipython.StopResponse\"\x00\x42<\n
 
org.apache.zeppelin.python.protoB\x0cIPythonProtoP\x01\xa2\x02\x07IPythonb\x06proto3')
+)
+
+_EXECUTESTATUS = _descriptor.EnumDescriptor(
+  name='ExecuteStatus',
+  full_name='ipython.ExecuteStatus',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='SUCCESS', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='ERROR', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=399,
+  serialized_end=438,
+)
+_sym_db.RegisterEnumDescriptor(_EXECUTESTATUS)
+
+ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS)
+_IPYTHONSTATUS = _descriptor.EnumDescriptor(
+  name='IPythonStatus',
+  full_name='ipython.IPythonStatus',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='STARTING', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='RUNNING', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=440,
+  serialized_end=482,
+)
+_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS)
+
+IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS)
+_OUTPUTTYPE = _descriptor.EnumDescriptor(
+  name='OutputType',
+  full_name='ipython.OutputType',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='TEXT', index=0, number=0,
+      options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='IMAGE', index=1, number=1,
+      options=None,
+      type=None),
+  ],
+  containing_type=None,
+  options=None,
+  serialized_start=484,
+  serialized_end=517,
+)
+_sym_db.RegisterEnumDescriptor(_OUTPUTTYPE)
+
+OutputType = enum_type_wrapper.EnumTypeWrapper(_OUTPUTTYPE)
+SUCCESS = 0
+ERROR = 1
+STARTING = 0
+RUNNING = 1
+TEXT = 0
+IMAGE = 1
+
+
+
+_EXECUTEREQUEST = _descriptor.Descriptor(
+  name='ExecuteRequest',
+  full_name='ipython.ExecuteRequest',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='code', full_name='ipython.ExecuteRequest.code', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=26,
+  serialized_end=56,
+)
+
+
+_EXECUTERESPONSE = _descriptor.Descriptor(
+  name='ExecuteResponse',
+  full_name='ipython.ExecuteResponse',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='status', full_name='ipython.ExecuteResponse.status', index=0,
+      number=1, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='type', full_name='ipython.ExecuteResponse.type', index=1,
+      number=2, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='output', full_name='ipython.ExecuteResponse.output', index=2,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=58,
+  serialized_end=166,
+)
+
+
+_CANCELREQUEST = _descriptor.Descriptor(
+  name='CancelRequest',
+  full_name='ipython.CancelRequest',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=168,
+  serialized_end=183,
+)
+
+
+_CANCELRESPONSE = _descriptor.Descriptor(
+  name='CancelResponse',
+  full_name='ipython.CancelResponse',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=185,
+  serialized_end=201,
+)
+
+
+_COMPLETIONREQUEST = _descriptor.Descriptor(
+  name='CompletionRequest',
+  full_name='ipython.CompletionRequest',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='code', full_name='ipython.CompletionRequest.code', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='cursor', full_name='ipython.CompletionRequest.cursor', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=203,
+  serialized_end=252,
+)
+
+
+_COMPLETIONRESPONSE = _descriptor.Descriptor(
+  name='CompletionResponse',
+  full_name='ipython.CompletionResponse',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='matches', full_name='ipython.CompletionResponse.matches', index=0,
+      number=1, type=9, cpp_type=9, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=254,
+  serialized_end=291,
+)
+
+
+_STATUSREQUEST = _descriptor.Descriptor(
+  name='StatusRequest',
+  full_name='ipython.StatusRequest',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=293,
+  serialized_end=308,
+)
+
+
+_STATUSRESPONSE = _descriptor.Descriptor(
+  name='StatusResponse',
+  full_name='ipython.StatusResponse',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='status', full_name='ipython.StatusResponse.status', index=0,
+      number=1, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=310,
+  serialized_end=366,
+)
+
+
+_STOPREQUEST = _descriptor.Descriptor(
+  name='StopRequest',
+  full_name='ipython.StopRequest',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=368,
+  serialized_end=381,
+)
+
+
+_STOPRESPONSE = _descriptor.Descriptor(
+  name='StopResponse',
+  full_name='ipython.StopResponse',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=383,
+  serialized_end=397,
+)
+
+_EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS
+_EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE
+_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS
+DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST
+DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE
+DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST
+DESCRIPTOR.message_types_by_name['CancelResponse'] = _CANCELRESPONSE
+DESCRIPTOR.message_types_by_name['CompletionRequest'] = _COMPLETIONREQUEST
+DESCRIPTOR.message_types_by_name['CompletionResponse'] = _COMPLETIONRESPONSE
+DESCRIPTOR.message_types_by_name['StatusRequest'] = _STATUSREQUEST
+DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE
+DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST
+DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE
+DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS
+DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS
+DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', 
(_message.Message,), dict(
+  DESCRIPTOR = _EXECUTEREQUEST,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.ExecuteRequest)
+  ))
+_sym_db.RegisterMessage(ExecuteRequest)
+
+ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', 
(_message.Message,), dict(
+  DESCRIPTOR = _EXECUTERESPONSE,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.ExecuteResponse)
+  ))
+_sym_db.RegisterMessage(ExecuteResponse)
+
+CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', 
(_message.Message,), dict(
+  DESCRIPTOR = _CANCELREQUEST,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.CancelRequest)
+  ))
+_sym_db.RegisterMessage(CancelRequest)
+
+CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', 
(_message.Message,), dict(
+  DESCRIPTOR = _CANCELRESPONSE,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.CancelResponse)
+  ))
+_sym_db.RegisterMessage(CancelResponse)
+
+CompletionRequest = 
_reflection.GeneratedProtocolMessageType('CompletionRequest', 
(_message.Message,), dict(
+  DESCRIPTOR = _COMPLETIONREQUEST,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.CompletionRequest)
+  ))
+_sym_db.RegisterMessage(CompletionRequest)
+
+CompletionResponse = 
_reflection.GeneratedProtocolMessageType('CompletionResponse', 
(_message.Message,), dict(
+  DESCRIPTOR = _COMPLETIONRESPONSE,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.CompletionResponse)
+  ))
+_sym_db.RegisterMessage(CompletionResponse)
+
+StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', 
(_message.Message,), dict(
+  DESCRIPTOR = _STATUSREQUEST,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.StatusRequest)
+  ))
+_sym_db.RegisterMessage(StatusRequest)
+
+StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', 
(_message.Message,), dict(
+  DESCRIPTOR = _STATUSRESPONSE,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.StatusResponse)
+  ))
+_sym_db.RegisterMessage(StatusResponse)
+
+StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', 
(_message.Message,), dict(
+  DESCRIPTOR = _STOPREQUEST,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.StopRequest)
+  ))
+_sym_db.RegisterMessage(StopRequest)
+
+StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', 
(_message.Message,), dict(
+  DESCRIPTOR = _STOPRESPONSE,
+  __module__ = 'ipython_pb2'
+  # @@protoc_insertion_point(class_scope:ipython.StopResponse)
+  ))
+_sym_db.RegisterMessage(StopResponse)
+
+
+DESCRIPTOR.has_options = True
+DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), 
_b('\n 
org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'))
+try:
+  # THESE ELEMENTS WILL BE DEPRECATED.
+  # Please use the generated *_pb2_grpc.py files instead.
+  import grpc
+  from grpc.beta import implementations as beta_implementations
+  from grpc.beta import interfaces as beta_interfaces
+  from grpc.framework.common import cardinality
+  from grpc.framework.interfaces.face import utilities as face_utilities
+
+
+  class IPythonStub(object):
+    """The IPython service definition.
+    """
+
+    def __init__(self, channel):
+      """Constructor.
+
+      Args:
+        channel: A grpc.Channel.
+      """
+      self.execute = channel.unary_stream(
+          '/ipython.IPython/execute',
+          request_serializer=ExecuteRequest.SerializeToString,
+          response_deserializer=ExecuteResponse.FromString,
+          )
+      self.complete = channel.unary_unary(
+          '/ipython.IPython/complete',
+          request_serializer=CompletionRequest.SerializeToString,
+          response_deserializer=CompletionResponse.FromString,
+          )
+      self.cancel = channel.unary_unary(
+          '/ipython.IPython/cancel',
+          request_serializer=CancelRequest.SerializeToString,
+          response_deserializer=CancelResponse.FromString,
+          )
+      self.status = channel.unary_unary(
+          '/ipython.IPython/status',
+          request_serializer=StatusRequest.SerializeToString,
+          response_deserializer=StatusResponse.FromString,
+          )
+      self.stop = channel.unary_unary(
+          '/ipython.IPython/stop',
+          request_serializer=StopRequest.SerializeToString,
+          response_deserializer=StopResponse.FromString,
+          )
+
+
+  class IPythonServicer(object):
+    """The IPython service definition.
+    """
+
+    def execute(self, request, context):
+      """Sends code
+      """
+      context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+      context.set_details('Method not implemented!')
+      raise NotImplementedError('Method not implemented!')
+
+    def complete(self, request, context):
+      """Get completion
+      """
+      context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+      context.set_details('Method not implemented!')
+      raise NotImplementedError('Method not implemented!')
+
+    def cancel(self, request, context):
+      """Cancel the running statement
+      """
+      context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+      context.set_details('Method not implemented!')
+      raise NotImplementedError('Method not implemented!')
+
+    def status(self, request, context):
+      """Get ipython kernel status
+      """
+      context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+      context.set_details('Method not implemented!')
+      raise NotImplementedError('Method not implemented!')
+
+    def stop(self, request, context):
+      # missing associated documentation comment in .proto file
+      pass
+      context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+      context.set_details('Method not implemented!')
+      raise NotImplementedError('Method not implemented!')
+
+
+  def add_IPythonServicer_to_server(servicer, server):
+    rpc_method_handlers = {
+        'execute': grpc.unary_stream_rpc_method_handler(
+            servicer.execute,
+            request_deserializer=ExecuteRequest.FromString,
+            response_serializer=ExecuteResponse.SerializeToString,
+        ),
+        'complete': grpc.unary_unary_rpc_method_handler(
+            servicer.complete,
+            request_deserializer=CompletionRequest.FromString,
+            response_serializer=CompletionResponse.SerializeToString,
+        ),
+        'cancel': grpc.unary_unary_rpc_method_handler(
+            servicer.cancel,
+            request_deserializer=CancelRequest.FromString,
+            response_serializer=CancelResponse.SerializeToString,
+        ),
+        'status': grpc.unary_unary_rpc_method_handler(
+            servicer.status,
+            request_deserializer=StatusRequest.FromString,
+            response_serializer=StatusResponse.SerializeToString,
+        ),
+        'stop': grpc.unary_unary_rpc_method_handler(
+            servicer.stop,
+            request_deserializer=StopRequest.FromString,
+            response_serializer=StopResponse.SerializeToString,
+        ),
+    }
+    generic_handler = grpc.method_handlers_generic_handler(
+        'ipython.IPython', rpc_method_handlers)
+    server.add_generic_rpc_handlers((generic_handler,))
+
+
+  class BetaIPythonServicer(object):
+    """The Beta API is deprecated for 0.15.0 and later.
+
+    It is recommended to use the GA API (classes and functions in this
+    file not marked beta) for all further purposes. This class was generated
+    only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
+    """The IPython service definition.
+    """
+    def execute(self, request, context):
+      """Sends code
+      """
+      context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
+    def complete(self, request, context):
+      """Get completion
+      """
+      context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
+    def cancel(self, request, context):
+      """Cancel the running statement
+      """
+      context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
+    def status(self, request, context):
+      """Get ipython kernel status
+      """
+      context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
+    def stop(self, request, context):
+      # missing associated documentation comment in .proto file
+      pass
+      context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
+
+
+  class BetaIPythonStub(object):
+    """The Beta API is deprecated for 0.15.0 and later.
+
+    It is recommended to use the GA API (classes and functions in this
+    file not marked beta) for all further purposes. This class was generated
+    only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
+    """The IPython service definition.
+    """
+    def execute(self, request, timeout, metadata=None, with_call=False, 
protocol_options=None):
+      """Sends code
+      """
+      raise NotImplementedError()
+    def complete(self, request, timeout, metadata=None, with_call=False, 
protocol_options=None):
+      """Get completion
+      """
+      raise NotImplementedError()
+    complete.future = None
+    def cancel(self, request, timeout, metadata=None, with_call=False, 
protocol_options=None):
+      """Cancel the running statement
+      """
+      raise NotImplementedError()
+    cancel.future = None
+    def status(self, request, timeout, metadata=None, with_call=False, 
protocol_options=None):
+      """Get ipython kernel status
+      """
+      raise NotImplementedError()
+    status.future = None
+    def stop(self, request, timeout, metadata=None, with_call=False, 
protocol_options=None):
+      # missing associated documentation comment in .proto file
+      pass
+      raise NotImplementedError()
+    stop.future = None
+
+
+  def beta_create_IPython_server(servicer, pool=None, pool_size=None, 
default_timeout=None, maximum_timeout=None):
+    """The Beta API is deprecated for 0.15.0 and later.
+
+    It is recommended to use the GA API (classes and functions in this
+    file not marked beta) for all further purposes. This function was
+    generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
+    request_deserializers = {
+      ('ipython.IPython', 'cancel'): CancelRequest.FromString,
+      ('ipython.IPython', 'complete'): CompletionRequest.FromString,
+      ('ipython.IPython', 'execute'): ExecuteRequest.FromString,
+      ('ipython.IPython', 'status'): StatusRequest.FromString,
+      ('ipython.IPython', 'stop'): StopRequest.FromString,
+    }
+    response_serializers = {
+      ('ipython.IPython', 'cancel'): CancelResponse.SerializeToString,
+      ('ipython.IPython', 'complete'): CompletionResponse.SerializeToString,
+      ('ipython.IPython', 'execute'): ExecuteResponse.SerializeToString,
+      ('ipython.IPython', 'status'): StatusResponse.SerializeToString,
+      ('ipython.IPython', 'stop'): StopResponse.SerializeToString,
+    }
+    method_implementations = {
+      ('ipython.IPython', 'cancel'): 
face_utilities.unary_unary_inline(servicer.cancel),
+      ('ipython.IPython', 'complete'): 
face_utilities.unary_unary_inline(servicer.complete),
+      ('ipython.IPython', 'execute'): 
face_utilities.unary_stream_inline(servicer.execute),
+      ('ipython.IPython', 'status'): 
face_utilities.unary_unary_inline(servicer.status),
+      ('ipython.IPython', 'stop'): 
face_utilities.unary_unary_inline(servicer.stop),
+    }
+    server_options = 
beta_implementations.server_options(request_deserializers=request_deserializers,
 response_serializers=response_serializers, thread_pool=pool, 
thread_pool_size=pool_size, default_timeout=default_timeout, 
maximum_timeout=maximum_timeout)
+    return beta_implementations.server(method_implementations, 
options=server_options)
+
+
+  def beta_create_IPython_stub(channel, host=None, metadata_transformer=None, 
pool=None, pool_size=None):
+    """The Beta API is deprecated for 0.15.0 and later.
+
+    It is recommended to use the GA API (classes and functions in this
+    file not marked beta) for all further purposes. This function was
+    generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
+    request_serializers = {
+      ('ipython.IPython', 'cancel'): CancelRequest.SerializeToString,
+      ('ipython.IPython', 'complete'): CompletionRequest.SerializeToString,
+      ('ipython.IPython', 'execute'): ExecuteRequest.SerializeToString,
+      ('ipython.IPython', 'status'): StatusRequest.SerializeToString,
+      ('ipython.IPython', 'stop'): StopRequest.SerializeToString,
+    }
+    response_deserializers = {
+      ('ipython.IPython', 'cancel'): CancelResponse.FromString,
+      ('ipython.IPython', 'complete'): CompletionResponse.FromString,
+      ('ipython.IPython', 'execute'): ExecuteResponse.FromString,
+      ('ipython.IPython', 'status'): StatusResponse.FromString,
+      ('ipython.IPython', 'stop'): StopResponse.FromString,
+    }
+    cardinalities = {
+      'cancel': cardinality.Cardinality.UNARY_UNARY,
+      'complete': cardinality.Cardinality.UNARY_UNARY,
+      'execute': cardinality.Cardinality.UNARY_STREAM,
+      'status': cardinality.Cardinality.UNARY_UNARY,
+      'stop': cardinality.Cardinality.UNARY_UNARY,
+    }
+    stub_options = beta_implementations.stub_options(host=host, 
metadata_transformer=metadata_transformer, 
request_serializers=request_serializers, 
response_deserializers=response_deserializers, thread_pool=pool, 
thread_pool_size=pool_size)
+    return beta_implementations.dynamic_stub(channel, 'ipython.IPython', 
cardinalities, options=stub_options)
+except ImportError:
+  pass
+# @@protoc_insertion_point(module_scope)

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_pb2_grpc.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/grpc/python/ipython_pb2_grpc.py 
b/python/src/main/resources/grpc/python/ipython_pb2_grpc.py
new file mode 100644
index 0000000..a590319
--- /dev/null
+++ b/python/src/main/resources/grpc/python/ipython_pb2_grpc.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+import grpc
+
+import ipython_pb2 as ipython__pb2
+
+
+class IPythonStub(object):
+  """The IPython service definition.
+  """
+
+  def __init__(self, channel):
+    """Constructor.
+
+    Args:
+      channel: A grpc.Channel.
+    """
+    self.execute = channel.unary_stream(
+        '/ipython.IPython/execute',
+        request_serializer=ipython__pb2.ExecuteRequest.SerializeToString,
+        response_deserializer=ipython__pb2.ExecuteResponse.FromString,
+        )
+    self.complete = channel.unary_unary(
+        '/ipython.IPython/complete',
+        request_serializer=ipython__pb2.CompletionRequest.SerializeToString,
+        response_deserializer=ipython__pb2.CompletionResponse.FromString,
+        )
+    self.cancel = channel.unary_unary(
+        '/ipython.IPython/cancel',
+        request_serializer=ipython__pb2.CancelRequest.SerializeToString,
+        response_deserializer=ipython__pb2.CancelResponse.FromString,
+        )
+    self.status = channel.unary_unary(
+        '/ipython.IPython/status',
+        request_serializer=ipython__pb2.StatusRequest.SerializeToString,
+        response_deserializer=ipython__pb2.StatusResponse.FromString,
+        )
+    self.stop = channel.unary_unary(
+        '/ipython.IPython/stop',
+        request_serializer=ipython__pb2.StopRequest.SerializeToString,
+        response_deserializer=ipython__pb2.StopResponse.FromString,
+        )
+
+
+class IPythonServicer(object):
+  """The IPython service definition.
+  """
+
+  def execute(self, request, context):
+    """Sends code
+    """
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def complete(self, request, context):
+    """Get completion
+    """
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def cancel(self, request, context):
+    """Cancel the running statement
+    """
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def status(self, request, context):
+    """Get ipython kernel status
+    """
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+  def stop(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
+
+def add_IPythonServicer_to_server(servicer, server):
+  rpc_method_handlers = {
+      'execute': grpc.unary_stream_rpc_method_handler(
+          servicer.execute,
+          request_deserializer=ipython__pb2.ExecuteRequest.FromString,
+          response_serializer=ipython__pb2.ExecuteResponse.SerializeToString,
+      ),
+      'complete': grpc.unary_unary_rpc_method_handler(
+          servicer.complete,
+          request_deserializer=ipython__pb2.CompletionRequest.FromString,
+          
response_serializer=ipython__pb2.CompletionResponse.SerializeToString,
+      ),
+      'cancel': grpc.unary_unary_rpc_method_handler(
+          servicer.cancel,
+          request_deserializer=ipython__pb2.CancelRequest.FromString,
+          response_serializer=ipython__pb2.CancelResponse.SerializeToString,
+      ),
+      'status': grpc.unary_unary_rpc_method_handler(
+          servicer.status,
+          request_deserializer=ipython__pb2.StatusRequest.FromString,
+          response_serializer=ipython__pb2.StatusResponse.SerializeToString,
+      ),
+      'stop': grpc.unary_unary_rpc_method_handler(
+          servicer.stop,
+          request_deserializer=ipython__pb2.StopRequest.FromString,
+          response_serializer=ipython__pb2.StopResponse.SerializeToString,
+      ),
+  }
+  generic_handler = grpc.method_handlers_generic_handler(
+      'ipython.IPython', rpc_method_handlers)
+  server.add_generic_rpc_handlers((generic_handler,))

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/ipython_server.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/grpc/python/ipython_server.py 
b/python/src/main/resources/grpc/python/ipython_server.py
new file mode 100644
index 0000000..1d92766
--- /dev/null
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import jupyter_client
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+import ipython_pb2
+import ipython_pb2_grpc
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
+
+is_py2 = sys.version[0] == '2'
+if is_py2:
+    import Queue as queue
+else:
+    import queue as queue
+
+
+TIMEOUT = 30
+
+class IPython(ipython_pb2_grpc.IPythonServicer):
+
+    def __init__(self, server):
+        self._status = ipython_pb2.STARTING
+        self._server = server
+
+    def start(self):
+        print("starting...")
+        sys.stdout.flush()
+        self._km, self._kc = 
jupyter_client.manager.start_new_kernel(kernel_name='python')
+        self._status = ipython_pb2.RUNNING
+
+    def execute(self, request, context):
+        print("execute code: " + request.code)
+        sys.stdout.flush()
+        stdout_queue = queue.Queue(maxsize = 10)
+        stderr_queue = queue.Queue(maxsize = 10)
+        image_queue = queue.Queue(maxsize = 5)
+
+        def _output_hook(msg):
+            msg_type = msg['header']['msg_type']
+            content = msg['content']
+            if msg_type == 'stream':
+                stdout_queue.put(content['text'])
+            elif msg_type in ('display_data', 'execute_result'):
+                stdout_queue.put(content['data'].get('text/plain', ''))
+                if 'image/png' in content['data']:
+                    image_queue.put(content['data']['image/png'])
+            elif msg_type == 'error':
+                stderr_queue.put('\n'.join(content['traceback']))
+
+
+        payload_reply = []
+        def execute_worker():
+            reply = self._kc.execute_interactive(request.code,
+                                            output_hook=_output_hook,
+                                            timeout=TIMEOUT)
+            payload_reply.append(reply)
+
+        t = threading.Thread(name="ConsumerThread", target=execute_worker)
+        t.start()
+
+        while t.is_alive():
+            while not stdout_queue.empty():
+                output = stdout_queue.get()
+                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                  type=ipython_pb2.TEXT,
+                                                  output=output)
+            while not stderr_queue.empty():
+                output = stderr_queue.get()
+                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                                  type=ipython_pb2.TEXT,
+                                                  output=output)
+            while not image_queue.empty():
+                output = image_queue.get()
+                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                  type=ipython_pb2.IMAGE,
+                                                  output=output)
+
+        while not stdout_queue.empty():
+            output = stdout_queue.get()
+            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                              type=ipython_pb2.TEXT,
+                                              output=output)
+        while not stderr_queue.empty():
+            output = stderr_queue.get()
+            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                              type=ipython_pb2.TEXT,
+                                              output=output)
+        while not image_queue.empty():
+            output = image_queue.get()
+            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                              type=ipython_pb2.IMAGE,
+                                              output=output)
+
+        if payload_reply:
+            result = []
+            for payload in payload_reply[0]['content']['payload']:
+                if payload['data']['text/plain']:
+                    result.append(payload['data']['text/plain'])
+            if result:
+                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
+                                                  type=ipython_pb2.TEXT,
+                                                  output='\n'.join(result))
+
+    def cancel(self, request, context):
+        self._km.interrupt_kernel()
+        return ipython_pb2.CancelResponse()
+
+    def complete(self, request, context):
+        reply = self._kc.complete(request.code, request.cursor, reply=True, 
timeout=TIMEOUT)
+        return 
ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
+
+    def status(self, request, context):
+        return ipython_pb2.StatusResponse(status = self._status)
+
+    def stop(self, request, context):
+        self._server.stop(0)
+        sys.exit(0)
+
+
+def serve(port):
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    ipython = IPython(server)
+    ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server)
+    server.add_insecure_port('[::]:' + port)
+    server.start()
+    ipython.start()
+    try:
+        while True:
+            time.sleep(_ONE_DAY_IN_SECONDS)
+    except KeyboardInterrupt:
+        server.stop(0)
+
+if __name__ == '__main__':
+    serve(sys.argv[1])

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/grpc/python/zeppelin_python.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/grpc/python/zeppelin_python.py 
b/python/src/main/resources/grpc/python/zeppelin_python.py
new file mode 100644
index 0000000..0f5638f
--- /dev/null
+++ b/python/src/main/resources/grpc/python/zeppelin_python.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+
+from io import BytesIO
+try:
+  from StringIO import StringIO
+except ImportError:
+  from io import StringIO
+
+class PyZeppelinContext(object):
+  """ A context impl that uses Py4j to communicate to JVM
+  """
+
+  def __init__(self, z):
+    self.z = z
+    self.paramOption = 
gateway.jvm.org.apache.zeppelin.display.ui.OptionInput.ParamOption
+    self.javaList = gateway.jvm.java.util.ArrayList
+    self.max_result = z.getMaxResult()
+
+  def input(self, name, defaultValue=""):
+    return self.z.getGui().input(name, defaultValue)
+
+  def select(self, name, options, defaultValue=""):
+    javaOptions = gateway.new_array(self.paramOption, len(options))
+    i = 0
+    for tuple in options:
+      javaOptions[i] = self.paramOption(tuple[0], tuple[1])
+      i += 1
+    return self.z.getGui().select(name, defaultValue, javaOptions)
+
+  def checkbox(self, name, options, defaultChecked=[]):
+    javaOptions = gateway.new_array(self.paramOption, len(options))
+    i = 0
+    for tuple in options:
+      javaOptions[i] = self.paramOption(tuple[0], tuple[1])
+      i += 1
+    javaDefaultCheck = self.javaList()
+    for check in defaultChecked:
+      javaDefaultCheck.append(check)
+    return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
+
+  def show(self, p, **kwargs):
+    if type(p).__name__ == "DataFrame": # does not play well with sub-classes
+      # `isinstance(p, DataFrame)` would req `import 
pandas.core.frame.DataFrame`
+      # and so a dependency on pandas
+      self.show_dataframe(p, **kwargs)
+    elif hasattr(p, '__call__'):
+      p() #error reporting
+
+  def show_dataframe(self, df, show_index=False, **kwargs):
+    """Pretty prints DF using Table Display System
+    """
+    limit = len(df) > self.max_result
+    header_buf = StringIO("")
+    if show_index:
+      idx_name = str(df.index.name) if df.index.name is not None else ""
+      header_buf.write(idx_name + "\t")
+    header_buf.write(str(df.columns[0]))
+    for col in df.columns[1:]:
+      header_buf.write("\t")
+      header_buf.write(str(col))
+    header_buf.write("\n")
+
+    body_buf = StringIO("")
+    rows = df.head(self.max_result).values if limit else df.values
+    index = df.index.values
+    for idx, row in zip(index, rows):
+      if show_index:
+        body_buf.write("%html <strong>{}</strong>".format(idx))
+        body_buf.write("\t")
+      body_buf.write(str(row[0]))
+      for cell in row[1:]:
+        body_buf.write("\t")
+        body_buf.write(str(cell))
+      body_buf.write("\n")
+    body_buf.seek(0); header_buf.seek(0)
+    #TODO(bzz): fix it, so it shows red notice, as in Spark
+    print("%table " + header_buf.read() + body_buf.read()) # +
+    #      ("\n<font color=red>Results are limited by {}.</font>" \
+    #          .format(self.max_result) if limit else "")
+    #)
+    body_buf.close(); header_buf.close()
+
+
+# start JVM gateway
+client = GatewayClient(address='127.0.0.1', port=${JVM_GATEWAY_PORT})
+gateway = JavaGateway(client)
+java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
+intp = gateway.entry_point
+z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/python/src/main/resources/interpreter-setting.json 
b/python/src/main/resources/interpreter-setting.json
index 3bc42b8..bc1a746 100644
--- a/python/src/main/resources/interpreter-setting.json
+++ b/python/src/main/resources/interpreter-setting.json
@@ -17,6 +17,29 @@
         "defaultValue": "1000",
         "description": "Max number of dataframe rows to display.",
         "type": "number"
+      },
+      "zeppelin.python.useIPython": {
+        "propertyName": "zeppelin.python.useIPython",
+        "defaultValue": true,
+        "description": "whether use IPython when it is available",
+        "type": "checkbox"
+      }
+    },
+    "editor": {
+      "language": "python",
+      "editOnDblClick": false
+    }
+  },
+  {
+    "group": "python",
+    "name": "ipython",
+    "className": "org.apache.zeppelin.python.IPythonInterpreter",
+    "properties": {
+      "zeppelin.ipython.launch.timeout": {
+        "propertyName": "zeppelin.ipython.launch.timeout",
+        "defaultValue": "30000",
+        "description": "time out for ipython launch",
+        "type": "number"
       }
     },
     "editor": {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
new file mode 100644
index 0000000..137d622
--- /dev/null
+++ 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.python;
+
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.ui.CheckBox;
+import org.apache.zeppelin.display.ui.Select;
+import org.apache.zeppelin.display.ui.TextBox;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+
+
+public class IPythonInterpreterTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IPythonInterpreterTest.class);
+  private IPythonInterpreter interpreter;
+
+  @Before
+  public void setUp() {
+    Properties properties = new Properties();
+    interpreter = new IPythonInterpreter(properties);
+    InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
+    interpreter.setInterpreterGroup(mockInterpreterGroup);
+    interpreter.open();
+  }
+
+  @After
+  public void close() {
+    interpreter.close();
+  }
+
+
+  @Test
+  public void testIPython() throws IOException, InterruptedException {
+    testInterpreter(interpreter);
+  }
+
+  public static void testInterpreter(final Interpreter interpreter) throws 
IOException, InterruptedException {
+    // to make this test can run under both python2 and python3
+    InterpreterResult result = interpreter.interpret("from __future__ import 
print_function", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // single output without print
+    InterpreterContext context = getInterpreterContext();
+    result = interpreter.interpret("'hello world'", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> interpreterResultMessages = 
context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("'hello world'", interpreterResultMessages.get(0).getData());
+
+    // only the last statement is printed
+    context = getInterpreterContext();
+    result = interpreter.interpret("'hello world'\n'hello world2'", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("'hello world2'", interpreterResultMessages.get(0).getData());
+
+    // single output
+    context = getInterpreterContext();
+    result = interpreter.interpret("print('hello world')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
+
+    // multiple output
+    context = getInterpreterContext();
+    result = interpreter.interpret("print('hello world')\nprint('hello 
world2')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("hello world\nhello world2\n", 
interpreterResultMessages.get(0).getData());
+
+    // assignment
+    context = getInterpreterContext();
+    result = interpreter.interpret("abc=1",context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(0, interpreterResultMessages.size());
+
+    // if block
+    context = getInterpreterContext();
+    result = interpreter.interpret("if abc > 
0:\n\tprint('True')\nelse:\n\tprint('False')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("True\n", interpreterResultMessages.get(0).getData());
+
+    // for loop
+    context = getInterpreterContext();
+    result = interpreter.interpret("for i in range(3):\n\tprint(i)", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("0\n1\n2\n", interpreterResultMessages.get(0).getData());
+
+    // syntax error
+    context = getInterpreterContext();
+    result = interpreter.interpret("print(unknown)", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertTrue(interpreterResultMessages.get(0).getData().contains("name 
'unknown' is not defined"));
+
+    // raise runtime exception
+    context = getInterpreterContext();
+    result = interpreter.interpret("1/0", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    
assertTrue(interpreterResultMessages.get(0).getData().contains("ZeroDivisionError"));
+
+    // ZEPPELIN-1133
+    context = getInterpreterContext();
+    result = interpreter.interpret("def greet(name):\n" +
+        "    print('Hello', name)\n" +
+        "greet('Jack')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("Hello Jack\n",interpreterResultMessages.get(0).getData());
+
+    // ZEPPELIN-1114
+    context = getInterpreterContext();
+    result = interpreter.interpret("print('there is no Error: ok')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("there is no Error: ok\n", 
interpreterResultMessages.get(0).getData());
+
+    // completion
+    context = getInterpreterContext();
+    List<InterpreterCompletion> completions = interpreter.completion("ab", 2, 
context);
+    assertEquals(2, completions.size());
+    assertEquals("abc", completions.get(0).getValue());
+    assertEquals("abs", completions.get(1).getValue());
+
+    context = getInterpreterContext();
+    interpreter.interpret("import sys", context);
+    completions = interpreter.completion("sys.", 4, context);
+    assertFalse(completions.isEmpty());
+
+    context = getInterpreterContext();
+    completions = interpreter.completion("sys.std", 7, context);
+    assertEquals(3, completions.size());
+    assertEquals("sys.stderr", completions.get(0).getValue());
+    assertEquals("sys.stdin", completions.get(1).getValue());
+    assertEquals("sys.stdout", completions.get(2).getValue());
+
+    // there's no completion for 'a.' because it is not recognized by compiler 
for now.
+    context = getInterpreterContext();
+    String st = "a='hello'\na.";
+    completions = interpreter.completion(st, st.length(), context);
+    assertEquals(0, completions.size());
+
+    // define `a` first
+    context = getInterpreterContext();
+    st = "a='hello'";
+    result = interpreter.interpret(st, context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(0, interpreterResultMessages.size());
+
+    // now we can get the completion for `a.`
+    context = getInterpreterContext();
+    st = "a.";
+    completions = interpreter.completion(st, st.length(), context);
+    // it is different for python2 and python3 and may even different for 
different minor version
+    // so only verify it is larger than 20
+    assertTrue(completions.size() > 20);
+
+    context = getInterpreterContext();
+    st = "a.co";
+    completions = interpreter.completion(st, st.length(), context);
+    assertEquals(1, completions.size());
+    assertEquals("a.count", completions.get(0).getValue());
+
+    // cursor is in the middle of code
+    context = getInterpreterContext();
+    st = "a.co\b='hello";
+    completions = interpreter.completion(st, 4, context);
+    assertEquals(1, completions.size());
+    assertEquals("a.count", completions.get(0).getValue());
+
+    // ipython help
+    context = getInterpreterContext();
+    result = interpreter.interpret("range?", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    
assertTrue(interpreterResultMessages.get(0).getData().contains("range(stop)"));
+
+    // timeit
+    context = getInterpreterContext();
+    result = interpreter.interpret("%timeit range(100)", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertTrue(interpreterResultMessages.get(0).getData().contains("loops"));
+
+    // cancel
+    final InterpreterContext context2 = getInterpreterContext();
+    new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        interpreter.cancel(context2);
+      }
+    }.start();
+    result = interpreter.interpret("import time\ntime.sleep(10)", context2);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    interpreterResultMessages = context2.out.getInterpreterResultMessages();
+    
assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt"));
+
+    // matplotlib
+    context = getInterpreterContext();
+    result = interpreter.interpret("%matplotlib inline\nimport 
matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)", 
context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    // the order of IMAGE and TEXT is not determined
+    // check there must be one IMAGE output
+    boolean hasImageOutput = false;
+    for (InterpreterResultMessage msg : interpreterResultMessages) {
+      if (msg.getType() == InterpreterResult.Type.IMG) {
+        hasImageOutput = true;
+      }
+    }
+    assertTrue("No Image Output", hasImageOutput);
+
+    // bokeh
+    // bokeh initialization
+    context = getInterpreterContext();
+    result = interpreter.interpret("from bokeh.io import output_notebook, 
show\n" +
+        "from bokeh.plotting import figure\n" +
+        "output_notebook(notebook_type='zeppelin')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(2, interpreterResultMessages.size());
+    assertEquals(InterpreterResult.Type.HTML, 
interpreterResultMessages.get(0).getType());
+    assertTrue(interpreterResultMessages.get(0).getData().contains("Loading 
BokehJS"));
+    assertEquals(InterpreterResult.Type.HTML, 
interpreterResultMessages.get(1).getType());
+    assertTrue(interpreterResultMessages.get(1).getData().contains("BokehJS is 
being loaded"));
+
+    // bokeh plotting
+    context = getInterpreterContext();
+    result = interpreter.interpret("from bokeh.plotting import figure, 
output_file, show\n" +
+        "x = [1, 2, 3, 4, 5]\n" +
+        "y = [6, 7, 2, 4, 5]\n" +
+        "p = figure(title=\"simple line example\", x_axis_label='x', 
y_axis_label='y')\n" +
+        "p.line(x, y, legend=\"Temp.\", line_width=2)\n" +
+        "show(p)", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals(InterpreterResult.Type.HTML, 
interpreterResultMessages.get(0).getType());
+    // docs_json is the source data of plotting which bokeh would use to 
render the plotting.
+    
assertTrue(interpreterResultMessages.get(0).getData().contains("docs_json"));
+
+    // ggplot
+    context = getInterpreterContext();
+    result = interpreter.interpret("from ggplot import *\n" +
+        "ggplot(diamonds, aes(x='price', fill='cut')) +\\\n" +
+        "    geom_density(alpha=0.25) +\\\n" +
+        "    facet_wrap(\"clarity\")", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    // the order of IMAGE and TEXT is not determined
+    // check there must be one IMAGE output
+    hasImageOutput = false;
+    for (InterpreterResultMessage msg : interpreterResultMessages) {
+      if (msg.getType() == InterpreterResult.Type.IMG) {
+        hasImageOutput = true;
+      }
+    }
+    assertTrue("No Image Output", hasImageOutput);
+
+    // ZeppelinContext
+
+    // TextBox
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.input(name='text_1', 
defaultValue='value_1')", context);
+    Thread.sleep(100);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    
assertTrue(interpreterResultMessages.get(0).getData().contains("'value_1'"));
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("text_1") instanceof TextBox);
+    TextBox textbox = (TextBox) context.getGui().getForms().get("text_1");
+    assertEquals("text_1", textbox.getName());
+    assertEquals("value_1", textbox.getDefaultValue());
+
+    // Select
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.select(name='select_1', 
options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
+    Select select = (Select) context.getGui().getForms().get("select_1");
+    assertEquals("select_1", select.getName());
+    assertEquals(2, select.getOptions().length);
+    assertEquals("name_1", select.getOptions()[0].getDisplayName());
+    assertEquals("value_1", select.getOptions()[0].getValue());
+
+    // CheckBox
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.checkbox(name='checkbox_1', 
options=[('value_1', 'name_1'), ('value_2', 'name_2')])", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("checkbox_1") instanceof 
CheckBox);
+    CheckBox checkbox = (CheckBox) 
context.getGui().getForms().get("checkbox_1");
+    assertEquals("checkbox_1", checkbox.getName());
+    assertEquals(2, checkbox.getOptions().length);
+    assertEquals("name_1", checkbox.getOptions()[0].getDisplayName());
+    assertEquals("value_1", checkbox.getOptions()[0].getValue());
+
+    // Pandas DataFrame
+    context = getInterpreterContext();
+    result = interpreter.interpret("import pandas as pd\ndf = 
pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    assertEquals(InterpreterResult.Type.TABLE, 
interpreterResultMessages.get(0).getType());
+    assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", 
interpreterResultMessages.get(0).getData());
+  }
+
+  private static InterpreterContext getInterpreterContext() {
+    return new InterpreterContext(
+        "noteId",
+        "paragraphId",
+        "replName",
+        "paragraphTitle",
+        "paragraphText",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        null,
+        null,
+        null,
+        new InterpreterOutput(null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
 
b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
index 8b48b24..d649e89 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
@@ -53,6 +53,7 @@ public class PythonInterpreterMatplotlibTest implements 
InterpreterOutputListene
     Properties p = new Properties();
     p.setProperty("zeppelin.python", "python");
     p.setProperty("zeppelin.python.maxResult", "100");
+    p.setProperty("zeppelin.python.useIPython", "false");
 
     intpGroup = new InterpreterGroup();
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
 
b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
index f200a0a..9e918c0 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
@@ -73,9 +73,21 @@ public class PythonInterpreterPandasSqlTest implements 
InterpreterOutputListener
     Properties p = new Properties();
     p.setProperty("zeppelin.python", "python");
     p.setProperty("zeppelin.python.maxResult", "100");
+    p.setProperty("zeppelin.python.useIPython", "false");
 
     intpGroup = new InterpreterGroup();
 
+    out = new InterpreterOutput(this);
+    context = new InterpreterContext("note", "id", null, "title", "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("id"),
+        new LinkedList<InterpreterContextRunner>(),
+        out);
+    InterpreterContext.set(context);
+
     python = new PythonInterpreter(p);
     python.setInterpreterGroup(intpGroup);
     python.open();
@@ -85,16 +97,7 @@ public class PythonInterpreterPandasSqlTest implements 
InterpreterOutputListener
 
     intpGroup.put("note", Arrays.asList(python, sql));
 
-    out = new InterpreterOutput(this);
 
-    context = new InterpreterContext("note", "id", null, "title", "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(),
-        out);
 
     // to make sure python is running.
     InterpreterResult ret = python.interpret("\n", context);
@@ -140,10 +143,10 @@ public class PythonInterpreterPandasSqlTest implements 
InterpreterOutputListener
     ret = sql.interpret("select name, age from df2 where age < 40", context);
 
     //then
-    assertEquals(new String(out.getOutputAt(0).toByteArray()), 
InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, 
out.getOutputAt(0).getType());
-    assertTrue(new 
String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0);
-    assertTrue(new 
String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0);
+    assertEquals(new String(out.getOutputAt(1).toByteArray()), 
InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, 
out.getOutputAt(1).getType());
+    assertTrue(new 
String(out.getOutputAt(1).toByteArray()).indexOf("moon\t33") > 0);
+    assertTrue(new 
String(out.getOutputAt(1).toByteArray()).indexOf("park\t34") > 0);
 
     assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case 
when name==\"aa\" then name else name end from df2", context).code());
   }
@@ -156,7 +159,6 @@ public class PythonInterpreterPandasSqlTest implements 
InterpreterOutputListener
     //then
     assertNotNull("Interpreter returned 'null'", ret);
     assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(out.toInterpreterResultMessage().size() == 0);
   }
 
   @Test
@@ -176,10 +178,10 @@ public class PythonInterpreterPandasSqlTest implements 
InterpreterOutputListener
 
     // then
     assertEquals(new String(out.getOutputAt(0).toByteArray()), 
InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, 
out.getOutputAt(0).getType());
-    assertTrue(new 
String(out.getOutputAt(0).toByteArray()).contains("index_name"));
-    assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan"));
-    assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7"));
+    assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE, 
out.getOutputAt(1).getType());
+    assertTrue(new 
String(out.getOutputAt(1).toByteArray()).contains("index_name"));
+    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("nan"));
+    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("6.7"));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index 837626c..195935d 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -59,6 +59,7 @@ public class PythonInterpreterTest implements 
InterpreterOutputListener {
     Properties p = new Properties();
     p.setProperty(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON);
     p.setProperty(MAX_RESULT, "1000");
+    p.setProperty("zeppelin.python.useIPython", "false");
     return p;
   }
 
@@ -85,6 +86,7 @@ public class PythonInterpreterTest implements 
InterpreterOutputListener {
         new LocalResourcePool("id"),
         new LinkedList<InterpreterContextRunner>(),
         out);
+    InterpreterContext.set(context);
     pythonInterpreter.open();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/python/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/python/src/test/resources/log4j.properties 
b/python/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a8e2c44
--- /dev/null
+++ b/python/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=
+#%5p [%t] (%F:%L) - %m%n
+#%-4r [%t] %-5p %c %x - %m%n
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
+log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index d35f973..36e4a6c 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -72,6 +72,32 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-python</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.sf.py4j</groupId>
+          <artifactId>py4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-python</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>net.sf.py4j</groupId>
+          <artifactId>py4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -95,12 +121,6 @@
       <scope>provided</scope>
     </dependency>
 
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
-
     <!-- Aether :: maven dependency resolution -->
     <dependency>
       <groupId>org.apache.maven</groupId>
@@ -355,6 +375,12 @@
             <exclude>**/SparkRInterpreterTest.java</exclude>
             <exclude>${pyspark.test.exclude}</exclude>
           </excludes>
+          <environmentVariables>
+            <!-- local pyspark execution needs PYTHONPATH otherwise python 
daemon in executor side will fail
+            e.g. sc.range(1,10).sum()
+            -->
+            
<PYTHONPATH>../interpreter/spark/pyspark/pyspark.zip:../interpreter/spark/pyspark/py4j-${spark.py4j.version}-src.zip:../interpreter/lib/python</PYTHONPATH>
+          </environmentVariables>
         </configuration>
       </plugin>
 
@@ -379,6 +405,19 @@
               <resource>reference.conf</resource>
             </transformer>
           </transformers>
+
+          <relocations>
+            <!-- shade guava and proto-buf, because it might conflict with 
those of spark -->
+            <relocation>
+              <pattern>com.google</pattern>
+              <shadedPattern>org.apache.zeppelin.com.google</shadedPattern>
+            </relocation>
+            <!-- shade netty, because it might conflict with that of spark-->
+            <relocation>
+              <pattern>io.netty</pattern>
+              <shadedPattern>org.apache.zeppelin.io.netty</shadedPattern>
+            </relocation>
+          </relocations>
         </configuration>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
new file mode 100644
index 0000000..f1b1435
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.python.IPythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * PySparkInterpreter which use IPython underlying.
+ */
+public class IPySparkInterpreter extends IPythonInterpreter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IPySparkInterpreter.class);
+
+  private SparkInterpreter sparkInterpreter;
+
+  public IPySparkInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    getProperty().setProperty("zeppelin.python", 
PySparkInterpreter.getPythonExec(property));
+    sparkInterpreter = getSparkInterpreter();
+    SparkConf conf = sparkInterpreter.getSparkContext().getConf();
+    String additionalPythonPath = 
conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
+        ":../interpreter/lib/python";
+    setAdditionalPythonPath(additionalPythonPath);
+    setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
+    super.open();
+  }
+
+  private SparkInterpreter getSparkInterpreter() {
+    LazyOpenInterpreter lazy = null;
+    SparkInterpreter spark = null;
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    spark = (SparkInterpreter) p;
+
+    if (lazy != null) {
+      lazy.open();
+    }
+    return spark;
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    super.cancel(context);
+    sparkInterpreter.cancel(context);
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (sparkInterpreter != null) {
+      sparkInterpreter.close();
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return sparkInterpreter.getProgress(context);
+  }
+
+  public boolean isSpark2() {
+    return 
sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
+  }
+
+  public JavaSparkContext getJavaSparkContext() {
+    return sparkInterpreter.getJavaSparkContext();
+  }
+
+  public Object getSQLContext() {
+    return sparkInterpreter.getSQLContext();
+  }
+
+  public Object getSparkSession() {
+    return sparkInterpreter.getSparkSession();
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 28910b2..e65df22 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -76,6 +76,8 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
   private static final int MAX_TIMEOUT_SEC = 10;
   private long pythonPid;
 
+  private IPySparkInterpreter iPySparkInterpreter;
+
   public PySparkInterpreter(Properties property) {
     super(property);
 
@@ -111,6 +113,37 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public void open() {
+    // try IPySparkInterpreter first
+    iPySparkInterpreter = getIPySparkInterpreter();
+    if (property.getProperty("zeppelin.spark.useIPython", 
"true").equals("true") &&
+        iPySparkInterpreter.checkIPythonPrerequisite()) {
+      try {
+        iPySparkInterpreter.open();
+        if (InterpreterContext.get() != null) {
+          // don't print it when it is in testing, just for easy output check 
in test.
+          InterpreterContext.get().out.write(("IPython is available, " +
+              "use IPython for PySparkInterpreter\n")
+              .getBytes());
+        }
+        LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter");
+        return;
+      } catch (Exception e) {
+        LOGGER.warn("Fail to open IPySparkInterpreter", e);
+      }
+    }
+    iPySparkInterpreter = null;
+
+    if (property.getProperty("zeppelin.spark.useIPython", 
"true").equals("true")) {
+      // don't print it when it is in testing, just for easy output check in 
test.
+      try {
+        InterpreterContext.get().out.write(("IPython is not available, " +
+            "use the native PySparkInterpreter\n")
+            .getBytes());
+      } catch (IOException e) {
+        LOGGER.warn("Fail to write InterpreterOutput", e);
+      }
+    }
+
     // Add matplotlib display hook
     InterpreterGroup intpGroup = getInterpreterGroup();
     if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
@@ -190,9 +223,24 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
       }
     }
 
+    LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
     return env;
   }
 
+  // Run python shell
+  // Choose python in the order of
+  // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
+  public static String getPythonExec(Properties properties) {
+    String pythonExec = properties.getProperty("zeppelin.pyspark.python", 
"python");
+    if (System.getenv("PYSPARK_PYTHON") != null) {
+      pythonExec = System.getenv("PYSPARK_PYTHON");
+    }
+    if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
+      pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
+    }
+    return pythonExec;
+  }
+
   private void createGatewayServerAndStartScript() {
     // create python script
     createPythonScript();
@@ -202,16 +250,7 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     gatewayServer = new GatewayServer(this, port);
     gatewayServer.start();
 
-    // Run python shell
-    // Choose python in the order of
-    // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
-    String pythonExec = getProperty("zeppelin.pyspark.python");
-    if (System.getenv("PYSPARK_PYTHON") != null) {
-      pythonExec = System.getenv("PYSPARK_PYTHON");
-    }
-    if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
-      pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
-    }
+    String pythonExec = getPythonExec(property);
     CommandLine cmd = CommandLine.parse(pythonExec);
     cmd.addArgument(scriptPath, false);
     cmd.addArgument(Integer.toString(port), false);
@@ -263,6 +302,10 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public void close() {
+    if (iPySparkInterpreter != null) {
+      iPySparkInterpreter.close();
+      return;
+    }
     executor.getWatchdog().destroyProcess();
     new File(scriptPath).delete();
     gatewayServer.shutdown();
@@ -353,6 +396,10 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
           + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
     }
 
+    if (iPySparkInterpreter != null) {
+      return iPySparkInterpreter.interpret(st, context);
+    }
+
     if (!pythonscriptRunning) {
       return new InterpreterResult(Code.ERROR, "python process not running"
           + outputStream.toString());
@@ -448,6 +495,10 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public void cancel(InterpreterContext context) {
+    if (iPySparkInterpreter != null) {
+      iPySparkInterpreter.cancel(context);
+      return;
+    }
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
     sparkInterpreter.cancel(context);
     try {
@@ -464,6 +515,9 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public int getProgress(InterpreterContext context) {
+    if (iPySparkInterpreter != null) {
+      return iPySparkInterpreter.getProgress(context);
+    }
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
     return sparkInterpreter.getProgress(context);
   }
@@ -472,6 +526,9 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
   @Override
   public List<InterpreterCompletion> completion(String buf, int cursor,
       InterpreterContext interpreterContext) {
+    if (iPySparkInterpreter != null) {
+      return iPySparkInterpreter.completion(buf, cursor, interpreterContext);
+    }
     if (buf.length() < cursor) {
       cursor = buf.length();
     }
@@ -588,6 +645,21 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     return spark;
   }
 
+  private IPySparkInterpreter getIPySparkInterpreter() {
+    LazyOpenInterpreter lazy = null;
+    IPySparkInterpreter iPySpark = null;
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    iPySpark = (IPySparkInterpreter) p;
+    return iPySpark;
+  }
+
   public SparkZeppelinContext getZeppelinContext() {
     SparkInterpreter sparkIntp = getSparkInterpreter();
     if (sparkIntp != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32517c9d/spark/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/interpreter-setting.json 
b/spark/src/main/resources/interpreter-setting.json
index e96265f..d646805 100644
--- a/spark/src/main/resources/interpreter-setting.json
+++ b/spark/src/main/resources/interpreter-setting.json
@@ -149,11 +149,28 @@
         "defaultValue": "python",
         "description": "Python command to run pyspark with",
         "type": "string"
+      },
+      "zeppelin.spark.useIPython": {
+        "envName": null,
+        "propertyName": "zeppelin.spark.useIPython",
+        "defaultValue": true,
+        "description": "whether use IPython when it is available",
+        "type": "checkbox"
       }
     },
     "editor": {
       "language": "python",
       "editOnDblClick": false
     }
+  },
+  {
+    "group": "spark",
+    "name": "ipyspark",
+    "className": "org.apache.zeppelin.spark.IPySparkInterpreter",
+    "properties": {},
+    "editor": {
+      "language": "python",
+      "editOnDblClick": false
+    }
   }
 ]

Reply via email to