Hi Andres,

Thanks for the detailed explanation.

but apparently I can't create a new DS within a map function


If you create a new DS within the map function, then you'll create as many
DSs as the number of elements in the old DS which... doesn't seem to be
your desired situation? I suppose you want to create a DS<Tuple> from
DS<String>. If that is the case you can write something like this:

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   DataStream<String> dsString = env.fromElements("1,a,1.1",
"2,b,2.2,-2", "3,c", "4,d,4.4");
   DataStream<Object> dsTuple = dsString.map(s -> {
      String[] split = s.split(",");
      if (split.length == 2) {
         return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
      } else if (split.length == 3) {
         return new Tuple3<>(Integer.valueOf(split[0]), split[1],
Double.valueOf(split[2]));
      } else {
         return new Tuple4<>(Integer.valueOf(split[0]), split[1],
Double.valueOf(split[2]), Long.valueOf(split[3]));
      }
   });

   dsTuple.print();
   env.execute();
}


How dynamically create the DS<Tuple>


As you can see in the above code, I did not create a DS<Tuple> but a
DS<Object>, because Tuple can't be directly used. It seems that you want to
turn this new DS into a table, but if different records have different
number of columns this is not a good practice as the schema of each record
is not the same (but as a workaround, you can fill the columns with null if
some record doesn't have this column).

Hope this solves your problem. If you have any other problems feel free to
write back.

Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:50写道:

> Hello,
>
> Let me list properly the questions I have:
>
> * How to catch into a string the content of a DataStream? about this point
> basically I have a DS<String> , the only way how I can use the content is
> within a map function , print , store the content somewhere or SQL queries.
> The point is that I need the content because depending on that content I
> need to create another DS and later register it as a Table environment,
> which means I need the value content but likewise the headers content and
> the whole info is within the DS<String>. The first option I had was play
> with the map function but apparently I can't create a new DS within a map
> function and less register it as a new table environment.
>
> My second option in this point could be create a sort of public variable
> to store the DS<String> content and then create my UDF, but sadly this is
> neither allowed. My options in this case would be either somehow store
> public the content of the DS<String> into a new variable, turn the
> DS<String> as String or store the content in a file and read the file and
> start over to parse the content to serve the header and content for the new
> DS.
>
> * How dynamically create the DS<Tuple>: well basically after parse the
> point above I might end up with an array of fields sometimes 4,3,2 doesnt
> matter then I might need to swing between different tuples or turn my
> content into Row to create a DS<Row>.
>
> I'm looking forward to reading your comments.
>
> thanks so much
>
> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi Andres,
>>
>> Sorry I can't quite get your question... Do you mean that how to spilt
>> the string into fields?
>>
>> There is a `split` method in java. You can give it a regexp and it will
>> return an array containing all the split fields.
>>
>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:28写道:
>>
>>> Hello Weng,
>>>
>>> thanks for your reply, however I'm struggling to somehow read the
>>> content of my DS with the payload that defines how many fields the message
>>> contains into a String. That is the reason why I thought into a map
>>> function for that DS.
>>>
>>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then
>>> it can change the whole time. How could I approach this challenge?
>>>
>>> thanks so much
>>>
>>> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <tsreape...@gmail.com>
>>> wrote:
>>>
>>>> Hi Andres,
>>>>
>>>> Are the payloads strings? If yes, one method is that you can store them
>>>> as strings and process it further with user defined functions when you need
>>>> to use them.
>>>>
>>>> Another method is that you can store them into arrays.
>>>>
>>>> Also, if the type of the first 3 fields are the same for the first and
>>>> second payload, you can use a Tuple4<> and set the last element as null for
>>>> the first payload.
>>>>
>>>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:09写道:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I need to create dynamically the size of my Tuple that feeds a DS, let
>>>>> me explain it better. Let's assume the first payload I read has this 
>>>>> format
>>>>> "filed1,field2,field3", then this might require a Tuple3<> but my payload
>>>>> later can be "field1,field2,field3,field4" then my Tuple might need to be
>>>>> refine it on the flight and now be Tuple4<>.
>>>>>
>>>>> How could I create this dynamically, any idea?
>>>>>
>>>>> Thanks so much
>>>>>
>>>>

Reply via email to