Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish closed issue #10895: Nested object support in Hudi Table using Flink URL: https://github.com/apache/hudi/issues/10895 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042890195 Thanks @ad1happy2go @danny0405 its worked for me after using the GenericRowData. I am closing the issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042234221 I am keep getting lang.ClassCastException: class org.apache.flink.table.data.binary.BinaryStringData cannot be cast to class org.apache.flink.table.data.ArrayData (org.apache.flink.table.data.binary.BinaryStringData and org.apache.flink.table.data.ArrayData are in unnamed module of loader 'app') at org.apache.flink.table.data.GenericRowData.getArray(GenericRowData.java:195) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-common-1.15.0.jar:1.15.0] at org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46) ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2042041118 Hi @ad1happy2go , I have added the - @Override public RowData map(Telemetry kafkaRecord) throws Exception { GenericRowData row = new GenericRowData(6); row.setField(0, StringData.fromString(kafkaRecord.getCampaignName())); row.setField(1,StringData.fromString("name9")); row.setField(2,23); row.setField(3, TimestampData.fromEpochMillis(1)); row.setField(4,StringData.fromString("p1")); row.setField(5,kafkaRecord.getData()); return row; } I can see data pushed to the S3 but there is no table in the Glue has been created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2031270238 HI @ad1happy2go Still facing the issue . Not able to figure out how to do with GenericRowData -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
ad1happy2go commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2029718076 @waytoharish Did you got a chance to try out GenericRowData, Are you still facing the issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
danny0405 commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2019378477 Another way is to use the `GenericRowData`, it's much more simpler or just the `Row` and convert it back to `RowData` with utility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2019330463 @danny0405 @ad1happy2go Please can you help me to understand if this is a Bug or I am doing anything wrong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014848588 Thanks for your time @ad1happy2go @ad1happy2go @danny0405 Here is the error which I am getting after the code change : 14:38:06,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka Source -> (Map -> row_data_to_hoodie_record, Sink: Print to Std. Out) (6/10) (b41f7215661da4d1d3d1c157a58c57e4) switched from RUNNING to FAILED on 8d293143-6737-4451-a2c5-4e4f6f85cbd4 @ localhost (dataPort=-1). java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-base-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:829) ~[?:?] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.15.0.jar:1.15.0] ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
Re: [I] Nested object support in Hudi Table using Flink [hudi]
ad1happy2go commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014803349 @waytoharish As discussed on call, can you provide latest code and exception what we were getting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011376344 Thank @danny0405 I am able to map the arraytype using below but not sure if I am doing it correctly private static HoodiePipeline.Builder createHudiPipeline(String targetTable, Map options) { return HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(256)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("data ARRAY>") .column("`partition` VARCHAR(20)") .pk("uuid") .partition("partition") .options(options); } // Define the schema for Hudi records public static final DataType ROW_ARRAY_DATA_TYPE = ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10))).notNull(); public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("partition", DataTypes.VARCHAR(10)), DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE)) ) .notNull(); Now I am trying to figure out how to map it to rowdata where I tried like static class HudiDataSource implements MapFunction { @Override public RowData map(Telemetry kafkaRecord) throws Exception { return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny")) ); } } but its falling with static class HudiDataSource implements MapFunction { @Override public RowData map(Telemetry kafkaRecord) throws Exception { return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny")) ); } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
danny0405 commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011334497 Hmm, i guess it is because the HoodiePipeline clazz does not concatenate the fiels in good shape, that might be a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011288296 Hi @danny0405 I dont see the SQL in Flink. Here is my code which I am trying in Java `public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("data", DataTypes.ARRAY( ROW( DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10)) ))), DataTypes.FIELD("partition", DataTypes.VARCHAR(10)) ) .notNull();` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
danny0405 commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011275481 Can you show us the complete sql text? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011212848 @ad1happy2go I am getting following error when tried with DataTypes.FIELD("data", DataTypes.ARRAY( ROW( DataTypes.FIELD("test", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10)) ))) Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ARRAY' at line 6, column 8. Was expecting one of: "ROW" ... ... ... ... ... ... ... "STRING" ... "BYTES" ... "TIMESTAMP_LTZ" ... "ARRAY" ... "ARRAY" "<" ... "RAW" ... "BOOLEAN" ... "INTEGER" ... "INT" ... "TINYINT" ... "SMALLINT" ... "BIGINT" ... "REAL" ... "DOUBLE" ... "FLOAT" ... "BINARY" ... "VARBINARY" ... "DECIMAL" ... "DEC" ... "NUMERIC" ... "ANY" ... "CHARACTER" ... "CHAR" ... "VARCHAR" ... "DATE" ... "TIME" ... "TIMESTAMP" ... at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:82) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695) at org.apache.hudi.util.HoodiePipeline$Builder.getTableDescriptor(HoodiePipeline.java:172) at org.apache.hudi.util.HoodiePipeline$Builder.sink(HoodiePipeline.java:162) at com.hudi.flink.quickstart.Kafka2HudiPipelineNew.main(Kafka2HudiPipelineNew.java:77) Caused by: org.apache.calcite.sql.parser.SqlParseException: Incorrect syntax near the keyword 'ARRAY' at line 6, column 8. Was expecting one of: "ROW" ... ... ... ... ... ... ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Nested object support in Hudi Table using Flink [hudi]
ad1happy2go commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2009802993 @waytoharish Can you let us know what error/issue you are facing? Or is it giving the compilation error only? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org