Hello everyone, let me be more precis on what I'm looking for at the end
because your example is right and very accurate in the way about how to
turn an array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm
planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to
specify the RowTypeInfo. Here is the big deal because yes I know I would be
able to use something like :


TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>,
Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws
Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap
anonymous function that could return every time: 1. different number of
elements within the Array and 2. the data type can be random likewise. I
mean is not fixed the whole time then my TypeInformation return would fix
every execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <sunhaib...@163.com> wrote:

> Hi Andres Angel,
>
> I guess people don't understand your problem (including me). I don't know
> if the following sample code is what you want, if not, can you describe the
> problem more clearly?
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
> .flatMap(new FlatMapFunction<List<Integer>, Row>() {
> @Override
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
> out.collect(Row.of(value.toArray(new Integer[0])));
> }
> }).print();
>
> env.execute("test job");
>
> Best,
> Haibo
>
> At 2019-07-30 02:30:27, "Andres Angel" <ingenieroandresan...@gmail.com>
> wrote:
>
> Hello everyone,
>
> I need to parse into an anonymous function an input data to turn it into
> several Row elements. Originally I would have done something like
> Row.of(1,2,3,4) but these elements can change on the flight as part of my
> function. This is why I have decided to store them in a list and right now
> it looks something like this:
>
> [image: image.png]
>
> Now, I need to return my out Collector it Row<> based on this elements. I
> checked on the Flink documentation but the Lambda functions are not
> supported :
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html ,
> Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as
> Row.of(myTuple):
>
>                     Tuple mytuple = Tuple.newInstance(5);
>                     for (int i = 0; i < pelements.size(); i++) {
>                         mytuple.setField(pelements.get(i), i);
>                     }
>                     out.collect(Row.of(mytuple));
>
>
> However , it doesnt work because this is being parsed s 1 element for
> sqlQuery step. how could I do something like:
>
> pelements.forEach(n->out.collect(Row.of(n)));
>
> Thanks so much
>
>

Reply via email to