Hello Chesnay,

Thank you, this solves the last problem that I had with my project!

I added code to append a random UUID to each data channel. (see
https://github.com/GEOFBOT/flink/commit/b94b1249c4a014d8d1e412f7647ff192f3621b10)
However, is a UUID overkill? We only need the channel to be unique for each
data set ID and subtask ID. UUID generation could slow down Flink because
it relies on more cryptographically strong number generators. Would there
be a significant risk of collision if we used a Random.nextInt() or the
current UNIX timestamp?

Cheers,
Geoffrey

On Sun, Oct 23, 2016 at 4:14 PM Chesnay Schepler <ches...@apache.org> wrote:

> Hey Geoffrey,
>
> I believe what we got here is that multiple operators are sending data
> over the same channel.
>
> The data transfer between the java and python processes uses
> memory-mapped files. The file
> name is based on the ID of the set and the subtask index of the operator
> and thus deterministic.
> (This is mostly due to historic reasons)
>
> The assumption was that the ID was unique, which obviously no longer
> holds when you re-use
> sets. And surprisingly it never occurred to me that might happen :(
>
> So, if this is indeed the problem then the fix is trivial: Within
> PythonStreamer#startPython() the
> file names are generated, and all we have to do is introduce a random
> component.
>
> Regards,
> Chesnay
>
> On 23.10.2016 04:58, Geoffrey Mon wrote:
> > Hello all,
> >
> > Thanks to Chesnay for the assistance with my other issues. I have one
> final
> > issue that I can't figure out how to solve that should hopefully be the
> > last one. I have been working on implementing bulk iterations in the
> Python
> > API and using said iterations in a research project.
> >
> > At the moment, I get an interesting exception coming from some
> deserializer
> > functions related to the iterators in the Python API. When I use a data
> set
> > (in the example, named "S") two times, one time in a join with an
> iterative
> > data set within an iteration and then another time after the iteration
> in a
> > join with the iteration result, such as in
> > https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I
> usually
> > get the following exception:
> > Traceback (most recent call last):
> >    File "/tmp/flink-dist-cache-.../flink/plan.py", line 103, in <module>
> >      env.execute(local=True)
> >    File "/tmp/flink-dist-cache-.../.../flink/flink/plan/Environment.py",
> > line 198, in execute
> >      operator._go()
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
> > line 63, in _go
> >      self._receive_broadcast_variables()
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
> > line 75, in _receive_broadcast_variables
> >      serializer_data = _get_deserializer(con.read_secondary,
> > self._env._types)
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 262, in _get_deserializer
> >      return TupleDeserializer([_get_deserializer(read, custom_types) for
> _
> > in range(ord(type))])
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 262, in _get_deserializer
> >      return TupleDeserializer([_get_deserializer(read, custom_types) for
> _
> > in range(ord(type))])
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 262, in _get_deserializer
> >      return TupleDeserializer([_get_deserializer(read, custom_types) for
> _
> > in range(ord(type))])
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 262, in _get_deserializer
> >      return TupleDeserializer([_get_deserializer(read, custom_types) for
> _
> > in range(ord(type))])
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 262, in _get_deserializer
> >      return TupleDeserializer([_get_deserializer(read, custom_types) for
> _
> > in range(ord(type))])
> >    File
> "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 285, in _get_deserializer
> >      raise Exception("Unable to find deserializer for type ID " +
> > str(ord(type)))
> > Exception: Unable to find deserializer for type ID 0
> >
> > (I believe that another variant of the error message with "type ID 63"
> did
> > appear some of the time, suggesting a race condition or similar, but I
> > cannot reproduce this at the time of writing) Removing the second reuse
> of
> > the data set eliminates the problem.
> >
> > While tracking down this problem, I found that if I have a data set
> (named
> > "S" again in this example) that has zip_with_index and reduce group
> > operators applied, and I do a join operation with it and the result of
> any
> > iterative data set, then I get a similar exception. An example file that
> > causes the issue is here:
> > https://gist.github.com/GEOFBOT/8490cc65155862f63306e322d38d276c
> >
> > In this case, I get:
> > Traceback (most recent call last):
> > File "/tmp/flink-dist-cache-.../.../flink/plan.py", line 50, in <module>
> > env.execute(local=True)
> > File "/tmp/flink-dist-cache-.../.../flink/flink/plan/Environment.py",
> line
> > 198, in execute
> > operator._go()
> > File "/tmp/flink-dist-cache-.../.../flink/flink/functions/Function.py",
> > line 64, in _go
> > self._run()
> > File
> "/tmp/flink-dist-cache-.../.../flink/flink/functions/JoinFunction.py",
> > line 28, in _run
> > for value in self._iterator:
> > File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 223, in next
> > return self._deserializer.deserialize(self._read)
> > File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 337, in deserialize
> > f2 = self._d2.deserialize(read)
> > File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 358, in deserialize
> > return tuple([s.deserialize(read) for s in self._deserializer])
> > File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 384, in deserialize
> > return unpack(">d", read(8))[0]
> > File "/tmp/flink-dist-cache-.../.../flink/flink/connection/Iterator.py",
> > line 188, in _read
> > print x, str.decode(x)
> > UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 1:
> > ordinal not in range(128)
> >
> > Interestingly, if the "S" data set is changed to have only "0.0"s (such
> as
> > by commenting out L27-30), there is no error. However, this may just be a
> > way of tricking the deserializer into working.
> >
> > If anyone has any information or assistance that could help me solve this
> > issue, I would really appreciate it. It would help me with my project and
> > also iron out any bugs that may be present in my Python API bulk
> iteration
> > implementation so that the feature is ready for production use.
> >
> > Cheers,
> > Geoffrey
> >
>
>

Reply via email to