Hi mates ! I'm trying to find the best way to persist data into columnar format (parquet) using Flink. Each event contains a fixed list of properties and a variable list of properties, defined by the user. And I would like to save user defined properties into separate columns on the fly.
Here is an example of my events: [ { "eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562", "timestamp": 123, "attributes": { "gender": "male", "geo": "Germany" } }, { "eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562", "timestamp": 123, "attributes": { "car": "big-cool-car", "phone": "iphone" } } ] As a result, I would like to have a table with the following columns *eventId | timestamp | gender | geo | car | phone* I've looked into streaming file sink, but it requires defining a schema before starting a job, which is not possible in my case. Also I've remembered about *explode sql function* that can help me with a standard sql, but it doesn't exist in the Flink Table API. I have found that since *1.13.0 version Flink *supports creation of row by names using *Row.withNames(), *so I guess this can be a key that solves my problem, here is what java doc says *Name-based field mode **withNames() creates a variable-length row. The fields can be accessed by name using getField(String) and setField(String, Object). Every field is initialized during the first call to setField(String, Object) for the given name. However, the framework will initialize missing fields with null and reorder all fields once more type information is available during serialization or input conversion. Thus, even name-based rows eventually become fixed-length composite types with a deterministic field order. Name-based rows perform worse than position-based rows but simplify row creation and code readability.* So it seems that all I need is to transform my event into a record manually and persist the resulting table into a file-system, but my noop demo example fails within an exception, here it is: public class TestApp { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Row row1 = Row.withNames(); row1.setField("a", "fb1"); row1.setField("b", "gmail1"); row1.setField("c", "vk1"); Row row2 = Row.withNames(); row2.setField("b", "ok2"); row2.setField("c", "gmail2"); tableEnv.fromValues(row1, row2).printSchema(); } } Here is a stack trace of the exception: *java.lang.IllegalArgumentException: Accessing a field by position is not supported in name-based field mode.* * at org.apache.flink.types.Row.getField(Row.java:257)* * at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)* * at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)* * at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)* * at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)* * at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)* * at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)* * at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)* * at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)* * at org.apache.flink.table.expressions.ApiExpressionUtils.convertRow(ApiExpressionUtils.java:123)* * at org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:103)* * at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)* * at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)* * at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)* * at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)* * at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)* * at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)* * at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)* * at org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:359)* * at org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:334)* Maybe someone has tried this feature and can guess what's wrong with the current code and how to make it work. Anyway I have a fallback - accumulate a butch of events, define the schema for them and write into file system manually, but I still hope that I can do this in more elegant way. Thx for your advice and time ! -- Best Regards, *Sharipov Rinat* CleverDATA make your data clever