[ 
https://issues.apache.org/jira/browse/LIVY-995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jianzhen Wu updated LIVY-995:
-----------------------------
    Description: 
Startup  and enable spark.python.profile.
{code:java}
./bin/pyspark --master local --conf spark.python.profile=true
{code}
 
Execute code related to Spark RDD. When pyspark is closed, Pyspark will output 
profile information.
{code:java}
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
[Stage 0:>                                                          (0 + 1) / 1]
100
>>>
============================================================
Profile of RDD<id=1>
============================================================
         244 function calls (241 primitive calls) in 0.001 seconds
 
   Ordered by: internal time, cumulative time
 
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      101    0.000    0.000    0.000    0.000 rdd.py:1237(<genexpr>)
      101    0.000    0.000    0.000    0.000 util.py:72(wrapper)
        1    0.000    0.000    0.000    0.000 serializers.py:255(dump_stream)
        1    0.000    0.000    0.000    0.000 serializers.py:213(load_stream)
        2    0.000    0.000    0.000    0.000 \{built-in method builtins.sum}
        1    0.000    0.000    0.001    0.001 worker.py:607(process)
        1    0.000    0.000    0.000    0.000 context.py:549(f)
        1    0.000    0.000    0.000    0.000 \{built-in method _pickle.dumps}
        1    0.000    0.000    0.000    0.000 serializers.py:561(read_int)
        1    0.000    0.000    0.000    0.000 serializers.py:568(write_int)
      4/1    0.000    0.000    0.000    0.000 rdd.py:2917(pipeline_func)
        1    0.000    0.000    0.000    0.000 serializers.py:426(dumps)
        1    0.000    0.000    0.000    0.000 rdd.py:1237(<lambda>)
        1    0.000    0.000    0.000    0.000 serializers.py:135(load_stream)
        2    0.000    0.000    0.000    0.000 rdd.py:1072(func)
        1    0.000    0.000    0.000    0.000 rdd.py:384(func)
        1    0.000    0.000    0.000    0.000 util.py:67(fail_on_stopiteration)
        1    0.000    0.000    0.000    0.000 
serializers.py:151(_read_with_length)
        2    0.000    0.000    0.000    0.000 context.py:546(getStart)
        3    0.000    0.000    0.000    0.000 rdd.py:416(func)
        1    0.000    0.000    0.000    0.000 
serializers.py:216(_load_stream_without_unbatching)
        2    0.000    0.000    0.000    0.000 \{method 'write' of 
'_io.BufferedWriter' objects}
        1    0.000    0.000    0.000    0.000 \{method 'read' of 
'_io.BufferedReader' objects}
        1    0.000    0.000    0.000    0.000 \{built-in method _operator.add}
        1    0.000    0.000    0.000    0.000 \{built-in method 
builtins.hasattr}
        3    0.000    0.000    0.000    0.000 \{built-in method builtins.len}
        1    0.000    0.000    0.000    0.000 \{built-in method _struct.unpack}
        1    0.000    0.000    0.000    0.000 rdd.py:1226(<lambda>)
        1    0.000    0.000    0.000    0.000 \{method 'close' of 'generator' 
objects}
        1    0.000    0.000    0.000    0.000 \{built-in method from_iterable}
        1    0.000    0.000    0.000    0.000 \{built-in method _struct.pack}
        1    0.000    0.000    0.000    0.000 \{method 'disable' of 
'_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 \{built-in method builtins.iter}
{code}
 
This is because Spark register show_profiles when Spark exit in profile.py 
{code:java}
    def add_profiler(self, id, profiler):
        """Add a profiler for RDD/UDF `id`"""
        if not self.profilers:
            if self.profile_dump_path:
                atexit.register(self.dump_profiles, self.profile_dump_path)
            else:
                atexit.register(self.show_profiles)
 
        self.profilers.append([id, profiler, False])
{code}
 
 
For Livy session, Livy does not convert the output to JSON format. And throw 
below exception:
 
{code:java}
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('=' (code 
61)): expected a valid value (JSON String, Number, Array, Object or token 
'null', 'true' or 'false')
 at [Source: 
(String)"============================================================"; line: 
1, column: 2]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952)
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
        at 
com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355)
        at 
com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023)
        at 
com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491)
        at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33)
        at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20)
        at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71)
        at 
org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287)
        at 
org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277)
        at 
org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62)
        at 
org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234)
        at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232)
        at 
org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232)
        at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
        at org.apache.livy.repl.Session.close(Session.scala:232)
        at 
org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246)
        at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72)
        at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764)
 {code}
 Livy sendShutdownRequest in PythonInterpreter.scala
{code:scala}
  override protected def sendShutdownRequest(): Unit = {
    sendRequest(Map(
      "msg_type" -> "shutdown_request",
      "content" -> ()
    )).foreach { case rep =>
      warn(f"process failed to shut down while returning $rep")
    }
  }

  private def sendRequest(request: Map[String, Any]): Option[JValue] = {
    stdin.println(write(request))
    stdin.flush()

    Option(stdout.readLine()).map { case line =>
      parse(line)
    }
  }
{code}
Livy does not convert stdout to json when exit in fake_shell.py 
{code:python}
def shutdown_request(_content):
    sys.exit()

msg_type_router = {
    'execute_request': execute_request,
    'shutdown_request': shutdown_request,
}

            try:
                handler = msg_type_router[msg_type]
            except KeyError:
                LOG.error('unknown message type: %s', msg_type)
                continue

            response = handler(content)
{code}

  was:
Startup  and enable spark.python.profile.
{code:java}
./bin/pyspark --master local --conf spark.python.profile=true
{code}
 
Execute code related to Spark RDD. When pyspark is closed, Pyspark will output 
profile information.
{code:java}
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
[Stage 0:>                                                          (0 + 1) / 1]
100
>>>
============================================================
Profile of RDD<id=1>
============================================================
         244 function calls (241 primitive calls) in 0.001 seconds
 
   Ordered by: internal time, cumulative time
 
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      101    0.000    0.000    0.000    0.000 rdd.py:1237(<genexpr>)
      101    0.000    0.000    0.000    0.000 util.py:72(wrapper)
        1    0.000    0.000    0.000    0.000 serializers.py:255(dump_stream)
        1    0.000    0.000    0.000    0.000 serializers.py:213(load_stream)
        2    0.000    0.000    0.000    0.000 \{built-in method builtins.sum}
        1    0.000    0.000    0.001    0.001 worker.py:607(process)
        1    0.000    0.000    0.000    0.000 context.py:549(f)
        1    0.000    0.000    0.000    0.000 \{built-in method _pickle.dumps}
        1    0.000    0.000    0.000    0.000 serializers.py:561(read_int)
        1    0.000    0.000    0.000    0.000 serializers.py:568(write_int)
      4/1    0.000    0.000    0.000    0.000 rdd.py:2917(pipeline_func)
        1    0.000    0.000    0.000    0.000 serializers.py:426(dumps)
        1    0.000    0.000    0.000    0.000 rdd.py:1237(<lambda>)
        1    0.000    0.000    0.000    0.000 serializers.py:135(load_stream)
        2    0.000    0.000    0.000    0.000 rdd.py:1072(func)
        1    0.000    0.000    0.000    0.000 rdd.py:384(func)
        1    0.000    0.000    0.000    0.000 util.py:67(fail_on_stopiteration)
        1    0.000    0.000    0.000    0.000 
serializers.py:151(_read_with_length)
        2    0.000    0.000    0.000    0.000 context.py:546(getStart)
        3    0.000    0.000    0.000    0.000 rdd.py:416(func)
        1    0.000    0.000    0.000    0.000 
serializers.py:216(_load_stream_without_unbatching)
        2    0.000    0.000    0.000    0.000 \{method 'write' of 
'_io.BufferedWriter' objects}
        1    0.000    0.000    0.000    0.000 \{method 'read' of 
'_io.BufferedReader' objects}
        1    0.000    0.000    0.000    0.000 \{built-in method _operator.add}
        1    0.000    0.000    0.000    0.000 \{built-in method 
builtins.hasattr}
        3    0.000    0.000    0.000    0.000 \{built-in method builtins.len}
        1    0.000    0.000    0.000    0.000 \{built-in method _struct.unpack}
        1    0.000    0.000    0.000    0.000 rdd.py:1226(<lambda>)
        1    0.000    0.000    0.000    0.000 \{method 'close' of 'generator' 
objects}
        1    0.000    0.000    0.000    0.000 \{built-in method from_iterable}
        1    0.000    0.000    0.000    0.000 \{built-in method _struct.pack}
        1    0.000    0.000    0.000    0.000 \{method 'disable' of 
'_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 \{built-in method builtins.iter}
{code}
 
This is because Spark register show_profiles when Spark exit in profile.py 
{code:java}
    def add_profiler(self, id, profiler):
        """Add a profiler for RDD/UDF `id`"""
        if not self.profilers:
            if self.profile_dump_path:
                atexit.register(self.dump_profiles, self.profile_dump_path)
            else:
                atexit.register(self.show_profiles)
 
        self.profilers.append([id, profiler, False])
{code}
 
 
For Livy session, Livy does not convert the output to JSON format. And throw 
below exception:
 
{code:java}
24/01/17 11:17:30 INFO [shutdown-hook-0] ApplicationMaster: Unregistering 
ApplicationMaster with FAILED (diag message: User class threw exception: 
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('=' (code 
61)): expected a valid value (JSON String, Number, Array, Object or token 
'null', 'true' or 'false')
 at [Source: 
(String)"============================================================"; line: 
1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952)
at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
at 
com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355)
at 
com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491)
at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33)
at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20)
at 
org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71)
at 
org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288)
at scala.Option.map(Option.scala:230)
at 
org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287)
at 
org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277)
at org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62)
at org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234)
at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232)
at org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232)
at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
at org.apache.livy.repl.Session.close(Session.scala:232)
at org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246)
at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72)
at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764)
)
{code}
 Livy sendShutdownRequest in PythonInterpreter.scala

{code:scala}
  override protected def sendShutdownRequest(): Unit = {
    sendRequest(Map(
      "msg_type" -> "shutdown_request",
      "content" -> ()
    )).foreach { case rep =>
      warn(f"process failed to shut down while returning $rep")
    }
  }

  private def sendRequest(request: Map[String, Any]): Option[JValue] = {
    stdin.println(write(request))
    stdin.flush()

    Option(stdout.readLine()).map { case line =>
      parse(line)
    }
  }
{code}

Livy does not convert stdout to json when exit in fake_shell.py 
{code:python}

def shutdown_request(_content):
    sys.exit()

msg_type_router = {
    'execute_request': execute_request,
    'shutdown_request': shutdown_request,
}

            try:
                handler = msg_type_router[msg_type]
            except KeyError:
                LOG.error('unknown message type: %s', msg_type)
                continue

            response = handler(content)
{code}


> JsonParseException is thrown when closing Livy session when using python 
> profile
> --------------------------------------------------------------------------------
>
>                 Key: LIVY-995
>                 URL: https://issues.apache.org/jira/browse/LIVY-995
>             Project: Livy
>          Issue Type: Improvement
>          Components: REPL
>            Reporter: Jianzhen Wu
>            Assignee: Jianzhen Wu
>            Priority: Critical
>             Fix For: 0.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Startup  and enable spark.python.profile.
> {code:java}
> ./bin/pyspark --master local --conf spark.python.profile=true
> {code}
>  
> Execute code related to Spark RDD. When pyspark is closed, Pyspark will 
> output profile information.
> {code:java}
> >>> rdd = sc.parallelize(range(100)).map(str)
> >>> rdd.count()
> [Stage 0:>                                                          (0 + 1) / 
> 1]
> 100
> >>>
> ============================================================
> Profile of RDD<id=1>
> ============================================================
>          244 function calls (241 primitive calls) in 0.001 seconds
>  
>    Ordered by: internal time, cumulative time
>  
>    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
>       101    0.000    0.000    0.000    0.000 rdd.py:1237(<genexpr>)
>       101    0.000    0.000    0.000    0.000 util.py:72(wrapper)
>         1    0.000    0.000    0.000    0.000 serializers.py:255(dump_stream)
>         1    0.000    0.000    0.000    0.000 serializers.py:213(load_stream)
>         2    0.000    0.000    0.000    0.000 \{built-in method builtins.sum}
>         1    0.000    0.000    0.001    0.001 worker.py:607(process)
>         1    0.000    0.000    0.000    0.000 context.py:549(f)
>         1    0.000    0.000    0.000    0.000 \{built-in method _pickle.dumps}
>         1    0.000    0.000    0.000    0.000 serializers.py:561(read_int)
>         1    0.000    0.000    0.000    0.000 serializers.py:568(write_int)
>       4/1    0.000    0.000    0.000    0.000 rdd.py:2917(pipeline_func)
>         1    0.000    0.000    0.000    0.000 serializers.py:426(dumps)
>         1    0.000    0.000    0.000    0.000 rdd.py:1237(<lambda>)
>         1    0.000    0.000    0.000    0.000 serializers.py:135(load_stream)
>         2    0.000    0.000    0.000    0.000 rdd.py:1072(func)
>         1    0.000    0.000    0.000    0.000 rdd.py:384(func)
>         1    0.000    0.000    0.000    0.000 
> util.py:67(fail_on_stopiteration)
>         1    0.000    0.000    0.000    0.000 
> serializers.py:151(_read_with_length)
>         2    0.000    0.000    0.000    0.000 context.py:546(getStart)
>         3    0.000    0.000    0.000    0.000 rdd.py:416(func)
>         1    0.000    0.000    0.000    0.000 
> serializers.py:216(_load_stream_without_unbatching)
>         2    0.000    0.000    0.000    0.000 \{method 'write' of 
> '_io.BufferedWriter' objects}
>         1    0.000    0.000    0.000    0.000 \{method 'read' of 
> '_io.BufferedReader' objects}
>         1    0.000    0.000    0.000    0.000 \{built-in method _operator.add}
>         1    0.000    0.000    0.000    0.000 \{built-in method 
> builtins.hasattr}
>         3    0.000    0.000    0.000    0.000 \{built-in method builtins.len}
>         1    0.000    0.000    0.000    0.000 \{built-in method 
> _struct.unpack}
>         1    0.000    0.000    0.000    0.000 rdd.py:1226(<lambda>)
>         1    0.000    0.000    0.000    0.000 \{method 'close' of 'generator' 
> objects}
>         1    0.000    0.000    0.000    0.000 \{built-in method from_iterable}
>         1    0.000    0.000    0.000    0.000 \{built-in method _struct.pack}
>         1    0.000    0.000    0.000    0.000 \{method 'disable' of 
> '_lsprof.Profiler' objects}
>         1    0.000    0.000    0.000    0.000 \{built-in method builtins.iter}
> {code}
>  
> This is because Spark register show_profiles when Spark exit in profile.py 
> {code:java}
>     def add_profiler(self, id, profiler):
>         """Add a profiler for RDD/UDF `id`"""
>         if not self.profilers:
>             if self.profile_dump_path:
>                 atexit.register(self.dump_profiles, self.profile_dump_path)
>             else:
>                 atexit.register(self.show_profiles)
>  
>         self.profilers.append([id, profiler, False])
> {code}
>  
>  
> For Livy session, Livy does not convert the output to JSON format. And throw 
> below exception:
>  
> {code:java}
> com.fasterxml.jackson.core.JsonParseException: Unexpected character ('=' 
> (code 61)): expected a valid value (JSON String, Number, Array, Object or 
> token 'null', 'true' or 'false')
>  at [Source: 
> (String)"============================================================"; line: 
> 1, column: 2]
>       at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
>       at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
>       at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
>       at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952)
>       at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
>       at 
> com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355)
>       at 
> com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023)
>       at 
> com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491)
>       at 
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33)
>       at 
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20)
>       at 
> org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71)
>       at 
> org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288)
>       at scala.Option.map(Option.scala:230)
>       at 
> org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287)
>       at 
> org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277)
>       at 
> org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62)
>       at 
> org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234)
>       at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232)
>       at 
> org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232)
>       at 
> scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
>       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>       at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
>       at org.apache.livy.repl.Session.close(Session.scala:232)
>       at 
> org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246)
>       at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72)
>       at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764)
>  {code}
>  Livy sendShutdownRequest in PythonInterpreter.scala
> {code:scala}
>   override protected def sendShutdownRequest(): Unit = {
>     sendRequest(Map(
>       "msg_type" -> "shutdown_request",
>       "content" -> ()
>     )).foreach { case rep =>
>       warn(f"process failed to shut down while returning $rep")
>     }
>   }
>   private def sendRequest(request: Map[String, Any]): Option[JValue] = {
>     stdin.println(write(request))
>     stdin.flush()
>     Option(stdout.readLine()).map { case line =>
>       parse(line)
>     }
>   }
> {code}
> Livy does not convert stdout to json when exit in fake_shell.py 
> {code:python}
> def shutdown_request(_content):
>     sys.exit()
> msg_type_router = {
>     'execute_request': execute_request,
>     'shutdown_request': shutdown_request,
> }
>             try:
>                 handler = msg_type_router[msg_type]
>             except KeyError:
>                 LOG.error('unknown message type: %s', msg_type)
>                 continue
>             response = handler(content)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to