gaogaotiantian commented on code in PR #55716:
URL: https://github.com/apache/spark/pull/55716#discussion_r3276789681


##########
python/pyspark/tests/test_spark_message_receiver.py:
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import io
+import unittest
+from typing import BinaryIO
+
+from pyspark.messages.spark_message_receiver import SparkMessageReceiver
+from pyspark.messages.zero_copy_byte_stream import ZeroCopyByteStream
+
+
+class StubMessageReceiver(SparkMessageReceiver):
+    """Concrete stub for testing the state machine in SparkMessageReceiver."""
+
+    def __init__(self) -> None:
+        super().__init__()
+        self.stream_finished = True
+
+    def _do_get_init_message(self) -> ZeroCopyByteStream:
+        return ZeroCopyByteStream(memoryview(b"init"))
+
+    def _do_get_data_stream(self) -> BinaryIO:
+        return io.BytesIO(b"data")
+
+    def _do_is_stream_finished(self) -> bool:
+        return self.stream_finished
+
+
+class SparkMessageReceiverTests(unittest.TestCase):
+    """Tests for SparkMessageReceiver state transitions."""
+
+    def test_happy_path(self):
+        """Calling init -> data -> finish in order succeeds."""
+        receiver = StubMessageReceiver()
+        init_msg = receiver.get_init_message()
+        self.assertIsInstance(init_msg, ZeroCopyByteStream)
+        data = receiver.get_data_stream()
+        self.assertEqual(data.read(), b"data")
+        self.assertTrue(receiver.is_stream_finished())
+
+    def test_invalid_transitions_fail(self):
+        """Calling methods out of order raises AssertionError."""
+        # Each entry: (setup_calls, invalid_call, description)
+        # setup_calls are executed first to reach a certain state,
+        # then invalid_call is expected to raise AssertionError.
+        cases = [
+            ([], "get_data_stream", "data before init"),
+            ([], "is_stream_finished", "finish before init"),
+            (["get_init_message"], "get_init_message", "double init"),
+            (["get_init_message"], "is_stream_finished", "finish before data"),
+            (["get_init_message", "get_data_stream"], "get_init_message", 
"init after data"),
+            (["get_init_message", "get_data_stream"], "get_data_stream", 
"double data"),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "get_init_message",
+                "init after done",
+            ),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "get_data_stream",
+                "data after done",
+            ),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "is_stream_finished",
+                "double finish",
+            ),
+        ]
+        for setup_calls, invalid_call, description in cases:
+            with self.subTest(description):
+                receiver = StubMessageReceiver()
+                for call in setup_calls:
+                    getattr(receiver, call)()
+                with self.assertRaises(AssertionError):
+                    getattr(receiver, invalid_call)()
+
+    def test_finish_returns_false_when_stream_not_finished(self):
+        receiver = StubMessageReceiver()
+        receiver.stream_finished = False
+        receiver.get_init_message()
+        receiver.get_data_stream()
+        self.assertFalse(receiver.is_stream_finished())
+
+
+if __name__ == "__main__":
+    from pyspark.tests.test_spark_message_receiver import *  # noqa: F403

Review Comment:
   Follow the new patterns for this. We don't write tests like this anymore (I 
believe I changed all others).



##########
python/pyspark/messages/spark_message_receiver.py:
##########
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from enum import Enum
+from functools import wraps
+from typing import BinaryIO, Callable, TypeVar
+from abc import ABC, abstractmethod
+
+from pyspark.messages.zero_copy_byte_stream import ZeroCopyByteStream
+
+
+T = TypeVar("T", bound="SparkMessageReceiver")
+R = TypeVar("R")
+
+
+class MessageState(Enum):
+    WAITING_FOR_INIT = 1
+    WAITING_FOR_DATA = 2
+    WAITING_FOR_FINISH = 3
+    DONE = 4
+
+
+class SparkMessageReceiver(ABC):
+    """
+    Generic class that implements receiving messages from Spark.
+    Caution: This class is STATEFUL. It is expected, that the
+    methods of this class are called in the following order:
+
+    1. Init -> 2. Data stream -> 3. Finish
+
+    This order is verified using assertions in the class. Each function
+    can be called EXACTLY ONCE in the specified order.
+    """
+
+    def __init__(self) -> None:
+        self._state = MessageState.WAITING_FOR_INIT
+
+    @staticmethod
+    def _state_transition(
+        required_state: MessageState, next_state: MessageState
+    ) -> Callable[[Callable[[T], R]], Callable[[T], R]]:
+        """Decorator to enforce state transitions."""
+
+        def decorator(func: Callable[[T], R]) -> Callable[[T], R]:
+            @wraps(func)
+            def wrapper(self: T) -> R:
+                assert self._state == required_state
+                result = func(self)
+                self._state = next_state
+                return result
+
+            return wrapper
+
+        return decorator
+
+    @_state_transition(MessageState.WAITING_FOR_INIT, 
MessageState.WAITING_FOR_DATA)
+    def get_init_message(self) -> ZeroCopyByteStream:
+        """
+        Returns:
+            the binary contents of the initial message as a ZeroCopyByteStream.
+        """
+        return self._do_get_init_message()
+
+    @_state_transition(MessageState.WAITING_FOR_DATA, 
MessageState.WAITING_FOR_FINISH)
+    def get_data_stream(self) -> BinaryIO:
+        """
+        Returns:
+            A binary stream containing the data to invoke the UDF on.
+        """
+        return self._do_get_data_stream()
+
+    @_state_transition(MessageState.WAITING_FOR_FINISH, MessageState.DONE)
+    def is_stream_finished(self) -> bool:

Review Comment:
   This is a bit weird. It's very difficult to imagine a function called 
`is_stream_finished` is a function that you should only call once. The name is 
different than all other `get_` methods. We should rename this function to 
imply that this is a state transform method. If we do need a 
`is_stream_finished`, it should be re-callable.



##########
python/pyspark/tests/test_spark_message_receiver.py:
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import io
+import unittest
+from typing import BinaryIO
+
+from pyspark.messages.spark_message_receiver import SparkMessageReceiver
+from pyspark.messages.zero_copy_byte_stream import ZeroCopyByteStream
+
+
+class StubMessageReceiver(SparkMessageReceiver):
+    """Concrete stub for testing the state machine in SparkMessageReceiver."""
+
+    def __init__(self) -> None:
+        super().__init__()
+        self.stream_finished = True
+
+    def _do_get_init_message(self) -> ZeroCopyByteStream:
+        return ZeroCopyByteStream(memoryview(b"init"))
+
+    def _do_get_data_stream(self) -> BinaryIO:
+        return io.BytesIO(b"data")
+
+    def _do_is_stream_finished(self) -> bool:
+        return self.stream_finished
+
+
+class SparkMessageReceiverTests(unittest.TestCase):
+    """Tests for SparkMessageReceiver state transitions."""
+
+    def test_happy_path(self):
+        """Calling init -> data -> finish in order succeeds."""
+        receiver = StubMessageReceiver()
+        init_msg = receiver.get_init_message()
+        self.assertIsInstance(init_msg, ZeroCopyByteStream)
+        data = receiver.get_data_stream()
+        self.assertEqual(data.read(), b"data")
+        self.assertTrue(receiver.is_stream_finished())
+
+    def test_invalid_transitions_fail(self):
+        """Calling methods out of order raises AssertionError."""
+        # Each entry: (setup_calls, invalid_call, description)
+        # setup_calls are executed first to reach a certain state,
+        # then invalid_call is expected to raise AssertionError.
+        cases = [

Review Comment:
   These are unnecessarily too many. The code path covered are just 3 methods, 
we don't need to do a permutation here.



##########
python/pyspark/messages/spark_message_receiver.py:
##########
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from enum import Enum
+from functools import wraps
+from typing import BinaryIO, Callable, TypeVar
+from abc import ABC, abstractmethod
+
+from pyspark.messages.zero_copy_byte_stream import ZeroCopyByteStream
+
+
+T = TypeVar("T", bound="SparkMessageReceiver")
+R = TypeVar("R")
+
+
+class MessageState(Enum):
+    WAITING_FOR_INIT = 1
+    WAITING_FOR_DATA = 2
+    WAITING_FOR_FINISH = 3
+    DONE = 4
+
+
+class SparkMessageReceiver(ABC):
+    """
+    Generic class that implements receiving messages from Spark.
+    Caution: This class is STATEFUL. It is expected, that the
+    methods of this class are called in the following order:
+
+    1. Init -> 2. Data stream -> 3. Finish
+
+    This order is verified using assertions in the class. Each function
+    can be called EXACTLY ONCE in the specified order.
+    """
+
+    def __init__(self) -> None:
+        self._state = MessageState.WAITING_FOR_INIT
+
+    @staticmethod
+    def _state_transition(
+        required_state: MessageState, next_state: MessageState
+    ) -> Callable[[Callable[[T], R]], Callable[[T], R]]:
+        """Decorator to enforce state transitions."""
+
+        def decorator(func: Callable[[T], R]) -> Callable[[T], R]:
+            @wraps(func)
+            def wrapper(self: T) -> R:
+                assert self._state == required_state
+                result = func(self)
+                self._state = next_state
+                return result
+
+            return wrapper
+
+        return decorator
+
+    @_state_transition(MessageState.WAITING_FOR_INIT, 
MessageState.WAITING_FOR_DATA)
+    def get_init_message(self) -> ZeroCopyByteStream:
+        """
+        Returns:
+            the binary contents of the initial message as a ZeroCopyByteStream.
+        """
+        return self._do_get_init_message()
+
+    @_state_transition(MessageState.WAITING_FOR_DATA, 
MessageState.WAITING_FOR_FINISH)
+    def get_data_stream(self) -> BinaryIO:
+        """
+        Returns:
+            A binary stream containing the data to invoke the UDF on.
+        """
+        return self._do_get_data_stream()
+
+    @_state_transition(MessageState.WAITING_FOR_FINISH, MessageState.DONE)
+    def is_stream_finished(self) -> bool:
+        """
+        Checks if a finish message was received
+        from the JVM. The finish message itself only
+        has a message id and marks the end of the stream.
+        If bytes different from the finish id are read
+        this means something went wrong while consuming the stream.
+        """
+        return self._do_is_stream_finished()
+
+    @abstractmethod
+    def _do_get_init_message(self) -> ZeroCopyByteStream:
+        """
+        Returns the contents of the init message
+        as a 'ZeroCopyByteStream'.
+
+        To be implemented by child classes.
+        """
+        pass

Review Comment:
   Conventionally, if we want to leave the function body empty, we should do 
`...` instead of `pass`. I think that's more common for `abstractmethod`.



##########
python/pyspark/tests/test_spark_message_receiver.py:
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import io
+import unittest
+from typing import BinaryIO
+
+from pyspark.messages.spark_message_receiver import SparkMessageReceiver
+from pyspark.messages.zero_copy_byte_stream import ZeroCopyByteStream
+
+
+class StubMessageReceiver(SparkMessageReceiver):
+    """Concrete stub for testing the state machine in SparkMessageReceiver."""
+
+    def __init__(self) -> None:
+        super().__init__()
+        self.stream_finished = True
+
+    def _do_get_init_message(self) -> ZeroCopyByteStream:
+        return ZeroCopyByteStream(memoryview(b"init"))
+
+    def _do_get_data_stream(self) -> BinaryIO:
+        return io.BytesIO(b"data")
+
+    def _do_is_stream_finished(self) -> bool:
+        return self.stream_finished
+
+
+class SparkMessageReceiverTests(unittest.TestCase):
+    """Tests for SparkMessageReceiver state transitions."""
+
+    def test_happy_path(self):
+        """Calling init -> data -> finish in order succeeds."""
+        receiver = StubMessageReceiver()
+        init_msg = receiver.get_init_message()
+        self.assertIsInstance(init_msg, ZeroCopyByteStream)
+        data = receiver.get_data_stream()
+        self.assertEqual(data.read(), b"data")
+        self.assertTrue(receiver.is_stream_finished())
+
+    def test_invalid_transitions_fail(self):
+        """Calling methods out of order raises AssertionError."""
+        # Each entry: (setup_calls, invalid_call, description)
+        # setup_calls are executed first to reach a certain state,
+        # then invalid_call is expected to raise AssertionError.
+        cases = [
+            ([], "get_data_stream", "data before init"),
+            ([], "is_stream_finished", "finish before init"),
+            (["get_init_message"], "get_init_message", "double init"),
+            (["get_init_message"], "is_stream_finished", "finish before data"),
+            (["get_init_message", "get_data_stream"], "get_init_message", 
"init after data"),
+            (["get_init_message", "get_data_stream"], "get_data_stream", 
"double data"),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "get_init_message",
+                "init after done",
+            ),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "get_data_stream",
+                "data after done",
+            ),
+            (
+                ["get_init_message", "get_data_stream", "is_stream_finished"],
+                "is_stream_finished",
+                "double finish",
+            ),
+        ]
+        for setup_calls, invalid_call, description in cases:
+            with self.subTest(description):
+                receiver = StubMessageReceiver()
+                for call in setup_calls:
+                    getattr(receiver, call)()
+                with self.assertRaises(AssertionError):
+                    getattr(receiver, invalid_call)()
+
+    def test_finish_returns_false_when_stream_not_finished(self):
+        receiver = StubMessageReceiver()

Review Comment:
   I don't understand what this test is doing.



##########
python/pyspark/worker_message.py:
##########
@@ -270,3 +279,11 @@ def from_stream(cls, stream: IO) -> "WorkerInitInfo":
             eval_conf=eval_conf,
             udf_info=udf_info,
         )
+
+
+def is_non_udf_info(

Review Comment:
   Huh, I think this is a bit too much. Let's just do an `assert isinstance()` 
if mypy complains. Or even an ignore. This method does not really do anything.



##########
python/pyspark/worker_message.py:
##########
@@ -46,7 +47,7 @@ class ResourceInfo:
     local_properties: dict[str, str]
 
     @classmethod
-    def from_stream(cls, stream: IO) -> "TaskContextInfo":
+    def from_stream(cls, stream: ZeroCopyByteStream) -> "TaskContextInfo":

Review Comment:
   I'm a bit concerned to bind the type to `ZeroCopyByteStream`. Same concern 
for `memoryview` vs `bytes`. I think we should do something more general. Maybe 
in the future we support more than the current `ZeroCopyByteStream`, or maybe 
we want to add some unittests in the future.
   
   For `ZeroCopyByteStream`, it is a kind of `IO` right? Can we make it a real 
`IO`? That supports the underlying protocols of an `IO` so we can generalize it?
   
   For `bytes` vs `memoryview`, we have `collections.abc.Buffer` after 3.12, 
which is the way to go but we need to support lower versions.
   
   Maybe I'm thinking too much about the future. It's probably fine that we 
leave it here for now. But at least we should leave a comment somewhere about 
`collections.abc.Buffer`.



##########
python/pyspark/worker_util.py:
##########
@@ -65,11 +67,11 @@ def add_path(path: str) -> bool:
     return False
 
 
-def read_command(serializer: FramedSerializer, file: Union[IO, bytes]) -> Any:
+def read_command(serializer: FramedSerializer, file: Union[IO, memoryview]) -> 
Any:

Review Comment:
   Yeah this is a tricky part. We should not throw away the support for 
`bytes`. We can add `memoryview` but let's keep `bytes` working. Have you tried 
the asv benchmark? It actually simulates the message to python workers. Is it 
still working?



##########
python/benchmarks/bench_eval_type.py:
##########
@@ -127,6 +127,47 @@ def write_preamble(cls, buf: io.BytesIO) -> None:
         cls.write_bool(False, buf)  # needs_broadcast_decryption_server
         write_int(0, buf)  # num_broadcast_variables
 
+    @classmethod
+    def write_init_message(
+        cls,
+        eval_type: int,
+        write_udf: Callable[[io.BufferedIOBase], None],
+        target_buffer: io.BytesIO,
+        runner_conf: dict[str, str] | None = None,
+        eval_conf: dict[str, str] | None = None,
+    ) -> None:
+        """Write the initial message with header, length + its data."""
+
+        # Write everything to a seperate buffer so we can
+        # determine the length of the initial message.
+        buf = io.BytesIO()
+        cls.write_preamble(buf)
+        write_int(eval_type, buf)
+        if runner_conf:
+            write_int(len(runner_conf), buf)
+            for k, v in runner_conf.items():
+                cls.write_utf8(k, buf)
+                cls.write_utf8(v, buf)
+        else:
+            write_int(0, buf)  # RunnerConf  (0 key-value pairs)
+        if eval_conf:
+            write_int(len(eval_conf), buf)
+            for k, v in eval_conf.items():
+                cls.write_utf8(k, buf)
+                cls.write_utf8(v, buf)
+        else:
+            write_int(0, buf)  # EvalConf    (0 key-value pairs)
+        write_udf(buf)
+
+        # Write the actual data
+        # header...
+        write_int(-8, target_buffer)  # SpecialLengths.START_OF_INIT_MESSAGE

Review Comment:
   Is there a way to do `SpecialLengths.START_OF_INIT_MESSAGE` instead of the 
magic number `-8`?



##########
python/pyspark/serializers.py:
##########
@@ -539,7 +541,7 @@ def loads(self, stream):
         elif length == SpecialLengths.NULL:
             return None
         s = stream.read(length)
-        return s.decode("utf-8") if self.use_unicode else s
+        return codecs.decode(s, "utf-8") if self.use_unicode else s

Review Comment:
   This is fine I think.



##########
python/pyspark/taskcontext.py:
##########
@@ -161,7 +161,7 @@ def _getOrCreate(cls: Type["TaskContext"]) -> "TaskContext":
         return cls._taskContext
 
     @classmethod
-    def _setTaskContext(cls: Type["TaskContext"], taskContext: "TaskContext") 
-> None:
+    def _setTaskContext(cls: Type["TaskContext"], taskContext: 
Optional["TaskContext"]) -> None:

Review Comment:
   Because you added signature to the function that calls it, which triggered 
the type check that has not been triggered before. This is a valid change.



##########
python/pyspark/worker_message.py:
##########
@@ -204,6 +205,9 @@ def from_stream(cls, stream: IO) -> "UDTFInfo":
         )
 
 
+UdfInfoType: TypeAlias = Union[memoryview, UDTFInfo, list[UDFInfo]]

Review Comment:
   Let's do `UDFInfoType` here.



##########
python/benchmarks/bench_eval_type.py:
##########
@@ -127,6 +127,47 @@ def write_preamble(cls, buf: io.BytesIO) -> None:
         cls.write_bool(False, buf)  # needs_broadcast_decryption_server
         write_int(0, buf)  # num_broadcast_variables
 
+    @classmethod
+    def write_init_message(
+        cls,
+        eval_type: int,
+        write_udf: Callable[[io.BufferedIOBase], None],
+        target_buffer: io.BytesIO,
+        runner_conf: dict[str, str] | None = None,
+        eval_conf: dict[str, str] | None = None,
+    ) -> None:
+        """Write the initial message with header, length + its data."""
+
+        # Write everything to a seperate buffer so we can
+        # determine the length of the initial message.
+        buf = io.BytesIO()
+        cls.write_preamble(buf)
+        write_int(eval_type, buf)
+        if runner_conf:
+            write_int(len(runner_conf), buf)
+            for k, v in runner_conf.items():
+                cls.write_utf8(k, buf)
+                cls.write_utf8(v, buf)
+        else:
+            write_int(0, buf)  # RunnerConf  (0 key-value pairs)
+        if eval_conf:
+            write_int(len(eval_conf), buf)
+            for k, v in eval_conf.items():
+                cls.write_utf8(k, buf)
+                cls.write_utf8(v, buf)
+        else:
+            write_int(0, buf)  # EvalConf    (0 key-value pairs)
+        write_udf(buf)
+
+        # Write the actual data
+        # header...
+        write_int(-8, target_buffer)  # SpecialLengths.START_OF_INIT_MESSAGE
+        print(buf.getbuffer().nbytes)

Review Comment:
   Why we need to print here?



##########
python/pyspark/worker_message.py:
##########
@@ -243,9 +247,14 @@ def from_stream(cls, stream: IO) -> "WorkerInitInfo":
             v = utf8_deserializer.loads(stream)
             eval_conf[k] = v
 
-        udf_info: Union[bytes, UDTFInfo, list[UDFInfo]]
+        udf_info: UdfInfoType
 
         if eval_type == PythonEvalType.NON_UDF:
+            # If the code in this branch changes

Review Comment:
   Does this comment make sense? When is `udf_info` `memoryview`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to