Re: How many events can Flink process each second
Hello A.V. Id depends on the the underlying resources you are planing for your jobs. I mean memory and processing will play a principal role about this answer. keep in mind you are capable to break down your job in a number of parallel tasks by environment or even by an specific taks within your pipeline. I have scale my jobs up to around 1M per second without any trouble but keep in mind it depends as well on how complex look your pipeline likewise. Unlike the capabilities of parallel processing likely your pipeline applies enrichment and transformation reading data from an external ecosystem that can bring additional overload over your benchmark. Briefly, you should demo your scenario and get your own conclusions , but without any doubt Flink is capable of many stream processing records. thanks AU On Wed, Oct 23, 2019 at 10:24 AM A. V. wrote: > Hi, > > My boss wants to know how many events Flink can process, analyse etc. per > second? I cant find this in the documentation. > > >
Flink KPL based on a custom class object
Hello folks, I need to create a flink producer for Kinesis capable to sink a payload based on a custom class object I have build. The official documentation comes with this basic example assuming that we are sinking a string object: FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); Based on that example I tried to implement my custom producer in the way that I will return the need object. FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new myObject(), producerConfig); The problem with this approach is that I get an error right on the FlinkKinesisProducer*<> *saying that it's *incapable to infer arguments* How would be the rightly way to sink this custom payload object? thanks so much
Update tables env after have been register them
Hello everyone, My use case assume that we execute a job where we load from Redis few data and turn it into DS to register them as tables. But, it's possible that after have completed this step the data might change and we may need to read again the data to keep the tables content up to date. Here is where our doubt comes up. We need to know whether or not is possible within a job reload the tables information after certain time frame frequently without restart the job and have to register again the tables from the beginning. I appreciate your comments and the way we could approach this use case. Thanks so much
Re: FlatMap returning Row<> based on ArrayList elements()
Hello Victor , You are totally right , so now this turn into is Flink capable to handle these cases where would be required define the type info in the row and the Table will infer the columns separated by comma or something similar? thanks AU On Wed, Aug 7, 2019 at 10:33 AM Victor Wong wrote: > Hi Andres, > > > > I’d like to share my thoughts: > > When you register a “Table”, you need to specify its “schema”, so how can > you register the table when the number of elements/columns and data types > are both nondeterministic. > > Correct me if I misunderstood your meaning. > > > > Best, > > Victor > > > > *From: *Andres Angel > *Date: *Wednesday, August 7, 2019 at 9:55 PM > *To: *Haibo Sun > *Cc: *user > *Subject: *Re: FlatMap returning Row<> based on ArrayList elements() > > > > Hello everyone, let me be more precis on what I'm looking for at the end > because your example is right and very accurate in the way about how to > turn an array into a Row() object. > > I have done it seamlessly: > > > > out.collect(Row.of(pelements.toArray())); > > > > Then I printed and the outcome is as expected: > > > > 5d2df2c2e7370c7843dad9ca,359731,1196,156789925,619381 > > > > Now I need to register this DS as a table and here is basically how I'm > planning to do it: > > > > tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5'); > > > > However, this returns an error on the DS registration due to I need to > specify the RowTypeInfo. Here is the big deal because yes I know I would be > able to use something like : > > > > > > TypeInformation[] types = { > > BasicTypeInfo.STRING_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO}; > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > DataStream ds = previousds.flatMap(new FlatMapFunction, > Row>() { > > @Override > > public void flatMap(List value, Collector out) throws > Exception { > > out.collect(Row.of(value.toArray(new Integer[0]))); > > } > > }).return(types); > > > > > > The problem with this approach is that I'm looking for a standard FlatMap > anonymous function that could return every time: 1. different number of > elements within the Array and 2. the data type can be random likewise. I > mean is not fixed the whole time then my TypeInformation return would fix > every execution. > > > > How could I approach this? > > > > thanks so much > > AU > > > > > > On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun wrote: > > Hi Andres Angel, > > > > I guess people don't understand your problem (including me). I don't know > if the following sample code is what you want, if not, can you describe the > problem more clearly? > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.fromElements(Arrays.asList(1, 2, 3, 4, 5)) > > .flatMap(new FlatMapFunction, Row>() { > > @Override > > public void flatMap(List value, Collector out) throws > Exception { > > out.collect(Row.of(value.toArray(new Integer[0]))); > > } > > }).print(); > > > > env.execute("test job"); > > > > Best, > > Haibo > > > At 2019-07-30 02:30:27, "Andres Angel" > wrote: > > Hello everyone, > > > > I need to parse into an anonymous function an input data to turn it into > several Row elements. Originally I would have done something like > Row.of(1,2,3,4) but these elements can change on the flight as part of my > function. This is why I have decided to store them in a list and right now > it looks something like this: > > > > [image: image.png] > > > > Now, I need to return my out Collector it Row<> based on this elements. I > checked on the Flink documentation but the Lambda functions are not > supported : > https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , > Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as > Row.of(myTuple): > > > > Tuple mytuple = Tuple.newInstance(5); > for (int i = 0; i < pelements.size(); i++) { > mytuple.setField(pelements.get(i), i); > } > out.collect(Row.of(mytuple)); > > > > > > However , it doesnt work because this is being parsed s 1 element for > sqlQuery step. how could I do something like: > > > > pelements.forEach(n->out.collect(Row.of(n))); > > > > Thanks so much > >
Re: FlatMap returning Row<> based on ArrayList elements()
Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object. I have done it seamlessly: out.collect(Row.of(pelements.toArray())); Then I printed and the outcome is as expected: 5d2df2c2e7370c7843dad9ca,359731,1196,156789925,619381 Now I need to register this DS as a table and here is basically how I'm planning to do it: tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5'); However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like : TypeInformation[] types = { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; DataStream ds = previousds.flatMap(new FlatMapFunction, Row>() { @Override public void flatMap(List value, Collector out) throws Exception { out.collect(Row.of(value.toArray(new Integer[0]))); } }).return(types); The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution. How could I approach this? thanks so much AU On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun wrote: > Hi Andres Angel, > > I guess people don't understand your problem (including me). I don't know > if the following sample code is what you want, if not, can you describe the > problem more clearly? > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromElements(Arrays.asList(1, 2, 3, 4, 5)) > .flatMap(new FlatMapFunction, Row>() { > @Override > public void flatMap(List value, Collector out) throws > Exception { > out.collect(Row.of(value.toArray(new Integer[0]))); > } > }).print(); > > env.execute("test job"); > > Best, > Haibo > > At 2019-07-30 02:30:27, "Andres Angel" > wrote: > > Hello everyone, > > I need to parse into an anonymous function an input data to turn it into > several Row elements. Originally I would have done something like > Row.of(1,2,3,4) but these elements can change on the flight as part of my > function. This is why I have decided to store them in a list and right now > it looks something like this: > > [image: image.png] > > Now, I need to return my out Collector it Row<> based on this elements. I > checked on the Flink documentation but the Lambda functions are not > supported : > https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , > Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as > Row.of(myTuple): > > Tuple mytuple = Tuple.newInstance(5); > for (int i = 0; i < pelements.size(); i++) { > mytuple.setField(pelements.get(i), i); > } > out.collect(Row.of(mytuple)); > > > However , it doesnt work because this is being parsed s 1 element for > sqlQuery step. how could I do something like: > > pelements.forEach(n->out.collect(Row.of(n))); > > Thanks so much > >
FlatMap returning Row<> based on ArrayList elements()
Hello everyone, I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this: [image: image.png] Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple): Tuple mytuple = Tuple.newInstance(5); for (int i = 0; i < pelements.size(); i++) { mytuple.setField(pelements.get(i), i); } out.collect(Row.of(mytuple)); However , it doesnt work because this is being parsed s 1 element for sqlQuery step. how could I do something like: pelements.forEach(n->out.collect(Row.of(n))); Thanks so much
Assign a Row.of(ListsElements) exception
Hello everyone, I have a list with bunch of elements and I need create a Row.of() based on the whole elements. I try to apply a lambda function for this purpose as: mylist.forEach(n->out.collect(Row.of(n))); but I got the exception below: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator How properly should I feed the Row based on the list elements? thanks so much AU
Re: LEFT JOIN issue SQL API
Thanks so much for your answer , but then how should I perform such as comparison ? Which options do we have ? Thanks Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li a écrit : > Hi, it's because the Outer Joins will generate retractions, consider the > behavior of Left Outer Join > > 1. left record arrives, no matched right record, so +(left, null) will > be generated. > 2 right record arrives, the previous result should be retracted, so > -(left, null) and +(left, right) will be generated > > Andres Angel 于2019年7月25日周四 上午8:15写道: > >> Hello guys I have registered some table environments and now I'm trying >> to perform a query on these using LEFT JOIN like the example below: >> >> Table fullenrichment = tenv.sqlQuery( >> "SELECT pp.a,pp.b,pp.c,pp.d,pp.a " + >> " FROM t1 pp LEFT JOIN t2 ent" + >> " ON pp.b = ent.b" + >> " LEFT JOIN t3 act " + >> " ON pp.a = act.a " >> ); >> >> Once the query is complete I need to read this into a Row DS >> >> DS results = tenv.toAppendStream(fullenrichment,Row.class); >> >> I'm getting the following error, however, if I execute the same code but >> instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the >> code works , why this behavior? >> >> 1930 [main] INFO >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - >> Flink Kinesis Consumer is going to read the following streams: >> tr-stream-ingestion, >> 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a getter for field fields >> 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a setter for field fields >> 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> Class class org.apache.flink.types.Row cannot be used as a POJO type >> because not all fields are valid POJO fields, and must be processed as >> GenericType. Please read the Flink documentation on "Data Types & >> Serialization" for details of the effect on performance. >> 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a getter for field fields >> 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a setter for field fields >> 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> Class class org.apache.flink.types.Row cannot be used as a POJO type >> because not all fields are valid POJO fields, and must be processed as >> GenericType. Please read the Flink documentation on "Data Types & >> Serialization" for details of the effect on performance. >> 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a getter for field fields >> 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> class org.apache.flink.types.Row does not contain a setter for field fields >> 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - >> Class class org.apache.flink.types.Row cannot be used as a POJO type >> because not all fields are valid POJO fields, and must be processed as >> GenericType. Please read the Flink documentation on "Data Types & >> Serialization" for details of the effect on performance. >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Table is not an append-only table. Use the toRetractStream() in order to >> handle add and retract messages. >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896) >> at >> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) >> at >> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) >> at consumer.trconsumer.main(trconsumer.java:180) >> >
LEFT JOIN issue SQL API
Hello guys I have registered some table environments and now I'm trying to perform a query on these using LEFT JOIN like the example below: Table fullenrichment = tenv.sqlQuery( "SELECT pp.a,pp.b,pp.c,pp.d,pp.a " + " FROM t1 pp LEFT JOIN t2 ent" + " ON pp.b = ent.b" + " LEFT JOIN t3 act " + " ON pp.a = act.a " ); Once the query is complete I need to read this into a Row DS DS results = tenv.toAppendStream(fullenrichment,Row.class); I'm getting the following error, however, if I execute the same code but instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the code works , why this behavior? 1930 [main] INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Flink Kinesis Consumer is going to read the following streams: tr-stream-ingestion, 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3698 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3730 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields 3753 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages. at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) at consumer.trconsumer.main(trconsumer.java:180)
sqlQuery split string
Hello everyone, Following the current available functions https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html, how could I split a column string by a caracter? example column content : col =a,b,c query: Select col from tenv expected return : cola , colb, colc thanks
Re: Accessing variables build within a map/flatmap function within a DS
Sure guys thanks for the support. I need to create an register a table based on the content of a DS<>, the point is that within the content I need to parse it somehow and get the part which is the values and the headers. I already tried to create a DS and register the new DS as table with headers and values catch within the anonymous function. However the anonymous function can not reach my env and tenv. therefore I can yes create the DS with values but I need to store the headers to be accessed out from the anonymous and register the tenv with the content of the new DS and the headers from my variable. I bit weird maybe I know but this is what we need :). AU On Wed, Jul 24, 2019 at 9:01 AM Chesnay Schepler wrote: > Note that this will only work when running the the application in the IDE; > specifically it will not work when running on an actual cluster, since your > function isn't executed on the same machine as your (presumably) main[] > function. > > We can give you better advice if you tell us what exactly you're trying to > achieve. > > On 24/07/2019 14:53, Caizhi Weng wrote: > > Hi Andres, > > Just define a variable outside and modify it in the anonymous class. > > Andres Angel 于2019年7月24日周三 下午8:44写道: > >> Hello everyone, >> >> I was wondering if there is a way how to read the content of a varible >> build within a map/flatmap function out of the DS method. >> >> example: >> >> DataStream dsString = env.fromElements("1,a,1.1|2,b,2.2,-2", >> "3,c|4,d,4.4"); >> DataStream dsTuple = dsString.flatMap(new >> FlatMapFunction() { >> @Override >> public void flatMap(String value, Collector out) throws >> Exception { >> String var1 = "dummy"; >> >> } >> }); >> >> System.out.print(var1); >> >> thanks so much >> > >
Accessing variables build within a map/flatmap function within a DS
Hello everyone, I was wondering if there is a way how to read the content of a varible build within a map/flatmap function out of the DS method. example: DataStream dsString = env.fromElements("1,a,1.1|2,b,2.2,-2", "3,c|4,d,4.4"); DataStream dsTuple = dsString.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String var1 = "dummy"; } }); System.out.print(var1); thanks so much
Re: Create Tuple Array dynamically fro a DS
Hello Weng, This definitely helps a lot, however I know my initial DS has a single row content then I would in theory just create a DS which is what I need. That is why I need to know how to create a new environment DS within a map function. thanks so much On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng wrote: > Hi Andres, > > Thanks for the detailed explanation. > > but apparently I can't create a new DS within a map function > > > If you create a new DS within the map function, then you'll create as many > DSs as the number of elements in the old DS which... doesn't seem to be > your desired situation? I suppose you want to create a DS from > DS. If that is the case you can write something like this: > > public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > >DataStream dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", > "3,c", "4,d,4.4"); >DataStream dsTuple = dsString.map(s -> { > String[] split = s.split(","); > if (split.length == 2) { > return new Tuple2<>(Integer.valueOf(split[0]), split[1]); > } else if (split.length == 3) { > return new Tuple3<>(Integer.valueOf(split[0]), split[1], > Double.valueOf(split[2])); > } else { > return new Tuple4<>(Integer.valueOf(split[0]), split[1], > Double.valueOf(split[2]), Long.valueOf(split[3])); > } >}); > >dsTuple.print(); >env.execute(); > } > > > How dynamically create the DS > > > As you can see in the above code, I did not create a DS but a > DS, because Tuple can't be directly used. It seems that you want to > turn this new DS into a table, but if different records have different > number of columns this is not a good practice as the schema of each record > is not the same (but as a workaround, you can fill the columns with null if > some record doesn't have this column). > > Hope this solves your problem. If you have any other problems feel free to > write back. > > Andres Angel 于2019年7月24日周三 上午10:50写道: > >> Hello, >> >> Let me list properly the questions I have: >> >> * How to catch into a string the content of a DataStream? about this >> point basically I have a DS , the only way how I can use the >> content is within a map function , print , store the content somewhere or >> SQL queries. The point is that I need the content because depending on that >> content I need to create another DS and later register it as a Table >> environment, which means I need the value content but likewise the headers >> content and the whole info is within the DS. The first option I had >> was play with the map function but apparently I can't create a new DS >> within a map function and less register it as a new table environment. >> >> My second option in this point could be create a sort of public variable >> to store the DS content and then create my UDF, but sadly this is >> neither allowed. My options in this case would be either somehow store >> public the content of the DS into a new variable, turn the >> DS as String or store the content in a file and read the file and >> start over to parse the content to serve the header and content for the new >> DS. >> >> * How dynamically create the DS: well basically after parse the >> point above I might end up with an array of fields sometimes 4,3,2 doesnt >> matter then I might need to swing between different tuples or turn my >> content into Row to create a DS. >> >> I'm looking forward to reading your comments. >> >> thanks so much >> >> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng >> wrote: >> >>> Hi Andres, >>> >>> Sorry I can't quite get your question... Do you mean that how to spilt >>> the string into fields? >>> >>> There is a `split` method in java. You can give it a regexp and it will >>> return an array containing all the split fields. >>> >>> Andres Angel 于2019年7月24日周三 上午10:28写道: >>> >>>> Hello Weng, >>>> >>>> thanks for your reply, however I'm struggling to somehow read the >>>> content of my DS with the payload that defines how many fields the message >>>> contains into a String. That is the reason why I thought into a map >>>> function for that DS. >>>> >>>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then >>>> it can change the whole time. How could I approach this chall
Re: Create Tuple Array dynamically fro a DS
Hello, Let me list properly the questions I have: * How to catch into a string the content of a DataStream? about this point basically I have a DS , the only way how I can use the content is within a map function , print , store the content somewhere or SQL queries. The point is that I need the content because depending on that content I need to create another DS and later register it as a Table environment, which means I need the value content but likewise the headers content and the whole info is within the DS. The first option I had was play with the map function but apparently I can't create a new DS within a map function and less register it as a new table environment. My second option in this point could be create a sort of public variable to store the DS content and then create my UDF, but sadly this is neither allowed. My options in this case would be either somehow store public the content of the DS into a new variable, turn the DS as String or store the content in a file and read the file and start over to parse the content to serve the header and content for the new DS. * How dynamically create the DS: well basically after parse the point above I might end up with an array of fields sometimes 4,3,2 doesnt matter then I might need to swing between different tuples or turn my content into Row to create a DS. I'm looking forward to reading your comments. thanks so much On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng wrote: > Hi Andres, > > Sorry I can't quite get your question... Do you mean that how to spilt the > string into fields? > > There is a `split` method in java. You can give it a regexp and it will > return an array containing all the split fields. > > Andres Angel 于2019年7月24日周三 上午10:28写道: > >> Hello Weng, >> >> thanks for your reply, however I'm struggling to somehow read the content >> of my DS with the payload that defines how many fields the message contains >> into a String. That is the reason why I thought into a map function for >> that DS. >> >> The Tuple part can change overtime can even pass from 3 or 4 to 2 then it >> can change the whole time. How could I approach this challenge? >> >> thanks so much >> >> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng >> wrote: >> >>> Hi Andres, >>> >>> Are the payloads strings? If yes, one method is that you can store them >>> as strings and process it further with user defined functions when you need >>> to use them. >>> >>> Another method is that you can store them into arrays. >>> >>> Also, if the type of the first 3 fields are the same for the first and >>> second payload, you can use a Tuple4<> and set the last element as null for >>> the first payload. >>> >>> Andres Angel 于2019年7月24日周三 上午10:09写道: >>> >>>> Hello everyone, >>>> >>>> I need to create dynamically the size of my Tuple that feeds a DS, let >>>> me explain it better. Let's assume the first payload I read has this format >>>> "filed1,field2,field3", then this might require a Tuple3<> but my payload >>>> later can be "field1,field2,field3,field4" then my Tuple might need to be >>>> refine it on the flight and now be Tuple4<>. >>>> >>>> How could I create this dynamically, any idea? >>>> >>>> Thanks so much >>>> >>>
Re: Create Tuple Array dynamically fro a DS
Hello Weng, thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS. The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge? thanks so much On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng wrote: > Hi Andres, > > Are the payloads strings? If yes, one method is that you can store them as > strings and process it further with user defined functions when you need to > use them. > > Another method is that you can store them into arrays. > > Also, if the type of the first 3 fields are the same for the first and > second payload, you can use a Tuple4<> and set the last element as null for > the first payload. > > Andres Angel 于2019年7月24日周三 上午10:09写道: > >> Hello everyone, >> >> I need to create dynamically the size of my Tuple that feeds a DS, let me >> explain it better. Let's assume the first payload I read has this format >> "filed1,field2,field3", then this might require a Tuple3<> but my payload >> later can be "field1,field2,field3,field4" then my Tuple might need to be >> refine it on the flight and now be Tuple4<>. >> >> How could I create this dynamically, any idea? >> >> Thanks so much >> >
Create within a map function of a DS a new register DS
Hello everyone, I need to read an element from my DS and according to the content create on the flight a new DS and register it as new EnvironmentTable. I'm using the map function for my input DS, however when I try to use the variable env(environment, in my case StreamExecutionEnvironment ) I can't access apparently to the global job environment (I declare my env as final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);). Apparently something exactly is going on with the TENV (TableEnvironment) when I register within the map function of the DS the new table environment. Please correct me if I'm wrong, but is really possible what I'm trying to do? If not how could I then Parse a DS content as String to create a new separate method? thanks so much
Re: Transform from Table to DS
This has been fixed now, something weird is that according to the documentation , I might include around 4 maven packages to properly work along with the TABLE/SQL API https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/ . However , I solved my issue working without : I just leave: org.apache.flink flink-table_2.11 1.6.1 Thanks so much On Tue, Jul 23, 2019 at 9:54 PM Caizhi Weng wrote: > Hi Andres, > > Can you print your entire code (including the import section) in this > post? It might be that this Exception has something to do with your import. > If you are coding in a Java environment then you should import > StreamTableEnvironment.java not StreamTableEnvironment.scala. > > Andres Angel 于2019年7月24日周三 上午12:01写道: > >> Hello guys I'm working on Java environment and I have a sample code as: >> >> Table schemafit = tenv.sqlQuery("Here is my query"); >> >> I need to turn this into a DS to print and any other transformation then >> I doing a sort of: >> >> DataStream resultSet = tenv.toAppendStream(schemafit, Row.class); >> >> resultSet.print(); >> >> However, (please any help) I'm getting the error: >> >> Exception in thread "main" java.lang.NoClassDefFoundError: >> org/apache/flink/api/scala/typeutils/CaseClassTypeInfo >> at >> org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:246) >> at >> org.apache.flink.table.codegen.CodeGenerator.generateFieldAccess(CodeGenerator.scala:1217) >> at >> org.apache.flink.table.codegen.CodeGenerator.generateInputAccess(CodeGenerator.scala:1165) >> at >> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:286) >> at >> org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:269) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) >> at >> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269) >> at >> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111) >> at >> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74) >> at >> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:39) >> at >> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:82) >> at >> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:103) >> at >> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:969) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896) >> at >> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866) >> at >> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202) >> at >> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156) >> at consumer.trconsumer.main(trconsumer.java:116) >> Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.api.scala.typeutils.CaseClassTypeInfo >> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> ... 24 more >> >
Create Tuple Array dynamically fro a DS
Hello everyone, I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>. How could I create this dynamically, any idea? Thanks so much
Transform from Table to DS
Hello guys I'm working on Java environment and I have a sample code as: Table schemafit = tenv.sqlQuery("Here is my query"); I need to turn this into a DS to print and any other transformation then I doing a sort of: DataStream resultSet = tenv.toAppendStream(schemafit, Row.class); resultSet.print(); However, (please any help) I'm getting the error: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo at org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:246) at org.apache.flink.table.codegen.CodeGenerator.generateFieldAccess(CodeGenerator.scala:1217) at org.apache.flink.table.codegen.CodeGenerator.generateInputAccess(CodeGenerator.scala:1165) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:286) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$3.apply(CodeGenerator.scala:269) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269) at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.generateConversionProcessFunction(StreamScan.scala:111) at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:74) at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:39) at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:82) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:103) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:969) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156) at consumer.trconsumer.main(trconsumer.java:116) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.scala.typeutils.CaseClassTypeInfo at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 24 more
Use batch and stream environment in a single pipeline
Hello everyone, I need to create a table from a stream environment and thinking in a pure SQL approach I was wondering if I can create few of the enrichment tables in batch environment and only the streaming payload as streaming table environment. I tried to create a batch table environment with a streaming environment but it broke up, then I dont know if within the same flow can I have two different environments one as batch and other as streaming??? thanks so much AU