UtkarshSharma2612 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2069280084
I am writing using structured spark streaming to iceberg, my spark version
is 3.4.2 and iceberg version is 2. I am facing this issue also and changing
`.option()` to `.toTable` didn't help.
`24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter
types (timestamp) is used in partition transforms, but its definition couldn't
be found in the function catalog provided
24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter
types (timestamp) is used in partition transforms, but its definition couldn't
be found in the function catalog provided
24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id =
5222ef61-86e7-4f64-981c-6f330483730b, runId =
c873f460-5c5b-43f3-a182-9bfa09d29b34] terminated with error
org.apache.spark.sql.AnalysisException: years(timestamp) is not currently
supported
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id =
4cf32767-e385-43b7-8550-09125c6f638c, runId =
6de5e2e7-7638-4289-9a5c-aa45f2281889] terminated with error
org.apache.spark.sql.AnalysisException: years(timestamp) is not currently
supported
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for
adminclient-2 unregistered
24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for
adminclient-1 unregistered
24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
24/04/22 12:15:45 INFO Metrics: Closing reporter
org.apache.kafka.common.metrics.JmxReporter
24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool
for query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId =
6de5e2e7-7638-4289-9a5c-aa45f2281889] has been shutdown
24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
24/04/22 12:15:45 INFO Metrics: Closing reporter
org.apache.kafka.common.metrics.JmxReporter
24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool
for query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId =
c873f460-5c5b-43f3-a182-9bfa09d29b34] has been shutdown
24/04/22 12:15:45 ERROR TestJobPipelineImpl: stream error: {}
org.apache.spark.sql.streaming.StreamingQueryException: years(timestamp) is
not currently supported
=== Streaming Query ===
Identifier: [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId =
6de5e2e7-7638-4289-9a5c-aa45f2281889]
Current Committed Offsets: {}
Current Available Offsets:
{KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]:
{"com.engati.write.user.journey.conversion.data.to.db":{"0":729547}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource RelationV2[bot_ref#441, user_id#442,
timestamp#443, source_type#444, conversion_type#445, ad_id#446, ad_source#447,
ad_type#448, broadcast_id#449, broadcast_response_type#450, flow_id#451,
attribute_id#452] local.user_journey.conversion_analytics
local.user_journey.conversion_analytics,
local.user_journey.conversion_analytics, 4cf32767-e385-43b7-8550-09125c6f638c,
[checkpointLocation=/tmp/checkpointOne, fanout-enabled=true], Append
+- Project [bot_ref#78, user_id#91, timestamp#143, source_type#156,
conversion_type#195, ad_id#104, ad_source#117, ad_type#130, broadcast_id#169,
broadcast_response_type#182, flow_id#208, attribute_id#221]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43 AS
attribute_id#221, bot_ref#78, broadcast_id#169, broadcast_response_type#182,
conversion_type#195, flow_id#208, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43,
bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195,
flowId#48 AS flow_id#208, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43,
bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversionType#47 AS
conversion_type#195, flowId#48, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130,
attributeId#43, bot_ref#78, broadcast_id#169, broadcastResponseType#46 AS
broadcast_response_type#182, conversionType#47, flowId#48, source_type#156,
timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130,
attributeId#43, bot_ref#78, broadcastId#45 AS broadcast_id#169,
broadcastResponseType#46, conversionType#47, flowId#48, source_type#156,
timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130,
attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46,
conversionType#47, flowId#48, sourceType#49 AS source_type#156, timestamp#143,
user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130,
attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46,
conversionType#47, flowId#48, sourceType#49, timestamp#50 AS timestamp#143,
user_id#91]
+- Project [ad_id#104, ad_source#117, adType#42 AS
ad_type#130, attributeId#43, bot_ref#78, broadcastId#45,
broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49,
timestamp#50, user_id#91]
+- Project [ad_id#104, adSource#41 AS
ad_source#117, adType#42, attributeId#43, bot_ref#78, broadcastId#45,
broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49,
timestamp#50, user_id#91]
+- Project [adId#40 AS ad_id#104, adSource#41,
adType#42, attributeId#43, bot_ref#78, broadcastId#45,
broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49,
timestamp#50, user_id#91]
+- Project [adId#40, adSource#41,
adType#42, attributeId#43, bot_ref#78, broadcastId#45,
broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49,
timestamp#50, userId#51 AS user_id#91]
+- Project [adId#40, adSource#41,
adType#42, attributeId#43, botRef#44 AS bot_ref#78, broadcastId#45,
broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49,
timestamp#50, userId#51]
+- TypedFilter
com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1655/1994143461@2556e117,
class com.engati.analytics.etl.extract.models.ConversionDTO,
[StructField(adId,StringType,true), StructField(adSource,StringType,true),
StructField(adType,StringType,true), StructField(attributeId,IntegerType,true),
StructField(botRef,IntegerType,true), StructField(broadcastId,StringType,true),
StructField(broadcastResponseType,StringType,true),
StructField(conversionType,StringType,true),
StructField(flowId,IntegerType,true), StructField(sourceType,StringType,true),
StructField(timestamp,TimestampType,true),
StructField(userId,StringType,true)], initializejavabean(newInstance(class
com.engati.analytics.etl.extract.models.ConversionDTO),
(setFlowId,staticinvoke(class java.lang.Integer, ObjectType(class
java.lang.Integer), valueOf, cast(flowId#48 as int), true, false, true)),
(setConversionType,cast(conversionType#47 as string).toString)
, (setTimestamp,staticinvoke(class
org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class
java.sql.Timestamp), toJavaTimestamp, cast(timestamp#50 as timestamp), true,
false, true)), (setBotRef,staticinvoke(class java.lang.Integer,
ObjectType(class java.lang.Integer), valueOf, cast(botRef#44 as int), true,
false, true)), (setBroadcastId,cast(broadcastId#45 as string).toString),
(setAdType,cast(adType#42 as string).toString), (setAdSource,cast(adSource#41
as string).toString), (setBroadcastResponseType,cast(broadcastResponseType#46
as string).toString), (setAttributeId,staticinvoke(class java.lang.Integer,
ObjectType(class java.lang.Integer), valueOf, cast(attributeId#43 as int),
true, false, true)), (setSourceType,cast(sourceType#49 as string).toString),
(setUserId,cast(userId#51 as string).toString), (setAdId,cast(adId#40 as
string).toString))
+- SerializeFromObject
[staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdId, true,
false, true) AS adId#40, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdSource,
true, false, true) AS adSource#41, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdType, true,
false, true) AS adType#42, knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO,
true])).getAttributeId.intValue AS attributeId#43,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getB
otRef.intValue AS botRef#44, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastId,
true, false, true) AS broadcastId#45, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO,
true])).getBroadcastResponseType, true, false, true) AS
broadcastResponseType#46, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO,
true])).getConversionType, true, false, true) AS conversionType#47,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO,
true])).getFlowId.intValue AS flowId#48, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnul
l(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getSourceType,
true, false, true) AS sourceType#49, staticinvoke(class
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType,
fromJavaTimestamp, knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getTimestamp,
true, false, true) AS timestamp#50, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
knownnotnull(assertnotnull(input[0,
com.engati.analytics.etl.extract.models.ConversionDTO, true])).getUserId, true,
false, true) AS userId#51]
+- MapElements
com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1641/1836606934@1e93834b,
class java.lang.String, [StructField(value,StringType,true)], obj#39:
com.engati.analytics.etl.extract.models.ConversionDTO
+- DeserializeToObject
cast(value#21 as string).toString, obj#38: java.lang.String
+- Project [cast(value#8
as string) AS value#21]
+-
StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10,
offset#11L, timestamp#12, timestampType#13],
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@48bd89fe,
KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.sql.AnalysisException: years(timestamp) is not
currently supported
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
... 1 more
24/04/22 12:15:45 INFO SparkContext: Invoking stop() from shutdown hook
24/04/22 12:15:45 INFO SparkContext: SparkContext is stopping with exitCode
0.
24/04/22 12:15:45 INFO SparkUI: Stopped Spark web UI at
http://ip-10-12-72-49.ap-south-1.compute.internal:4040
24/04/22 12:15:45 INFO StandaloneSchedulerBackend: Shutting down all
executors
24/04/22 12:15:45 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint:
Asking each executor to shut down
24/04/22 12:15:45 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
24/04/22 12:15:45 INFO MemoryStore: MemoryStore cleared
24/04/22 12:15:45 INFO BlockManager: BlockManager stopped
24/04/22 12:15:45 INFO BlockManagerMaster: BlockManagerMaster stopped
24/04/22 12:15:45 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
24/04/22 12:15:45 INFO SparkContext: Successfully stopped SparkContext
24/04/22 12:15:45 INFO ShutdownHookManager: Shutdown hook called
24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory
/tmp/spark-bd10ae64-0bf2-4638-a070-2074fe0aeef7
24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory
/tmp/spark-75a1686e-6c8f-483b-9923-0d09834ccbd3`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]