Hi Andres,


I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you 
register the table when the number of elements/columns and data types are both 
nondeterministic.

Correct me if I misunderstood your meaning.



Best,

Victor

From: Andres Angel <ingenieroandresan...@gmail.com>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <sunhaib...@163.com>
Cc: user <user@flink.apache.org>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

Hello everyone, let me be more precis on what I'm looking for at the end 
because your example is right and very accurate in the way about how to turn an 
array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm 
planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to specify 
the RowTypeInfo. Here is the big deal because yes I know I would be able to use 
something like :


TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap 
anonymous function that could return every time: 1. different number of 
elements within the Array and 2. the data type can be random likewise. I mean 
is not fixed the whole time then my TypeInformation return would fix every 
execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun 
<sunhaib...@163.com<mailto:sunhaib...@163.com>> wrote:
Hi Andres Angel,

I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();

env.execute("test job");

Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" 
<ingenieroandresan...@gmail.com<mailto:ingenieroandresan...@gmail.com>> wrote:

Hello everyone,

I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:

[image.png]

Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery 
step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much

Reply via email to