LocationStratgies.PreferFixed in Structured Streaming
Hi, We have a use case where specific kafka partition data needs to be assigned to specific execuotr node. In Spark Streaming, this can be achieved using LocationStrategies.Preferfixed .How do we achieve same in structured streaming?? *Spark Streaming* Map partitionMapToHost = new HashMap<>(); partitionMapToHost.put(new TopicPartition("topic", 0), "node1"); partitionMapToHost.put(new TopicPartition("topic", 1), "node2"); partitionMapToHost.put(new TopicPartition("topic", 2), "node2"); KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferFixed(partitionMapToHost), ConsumerStrategies.Subscribe(topicCollection, kafkaParams)); Regards, Subacini
Drop Partition Fails
Hi, When we execute drop partition command on hive external table from spark-shell we are getting below error.Same command works fine from hive shell. It is a table with just two records Spark Version : 1.5.2 scala> hiveCtx.sql("select * from spark_2_test").collect().foreach(println); [1210,xcv,2016-10-10] [1210,xcv,2016-10-11] *Show create table * CREATE EXTERNAL TABLE `spark_2_test`( `name` string, `dept` string) PARTITIONED BY ( `server_date` date) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs:///spark/sp' TBLPROPERTIES ( 'STATS_GENERATED_VIA_STATS_TASK'='true', 'transient_lastDdlTime'='1485202737') scala> hiveCtx.sql("ALTER TABLE spark_2_test DROP IF EXISTS PARTITION (server_date ='2016-10-10')") Thanks in advance, Subacini 17/01/23 22:09:04 ERROR Driver: FAILED: SemanticException [Error 10006]: Partition not found (server_date = 2016-10-10) org.apache.hadoop.hive.ql.parse.SemanticException: Partition not found (server_date = 2016-10-10) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addTableDropPartsOutputs(DDLSemanticAnalyzer.java:3178) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeAlterTableDropParts(DDLSemanticAnalyzer.java:2694) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:278) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:227) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:424) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:308) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1122) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1170) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:451) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:440) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:278) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:233) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:270) at org.apache.spark.sql.hive.client.ClientWrapper.runHive(ClientWrapper.scala:440) at org.apache.spark.sql.hive.client.ClientWrapper.runSqlHive(ClientWrapper.scala:430) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:561) at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.DataFrame.(DataFrame.scala:144) at org.apache.spark.sql.DataFrame.(DataFrame.scala:129) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at $line105.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24) at $line105.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29) at $line105.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31) at $line105.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:33) at $line105.$read$$iwC$$iwC$$iwC$$iwC.(:35) at $line105.$read$$iwC$$iwC$$iwC.(:37) at $line105.$read$$iwC$$iwC.(:39) at $line105.$read$$iwC.(:41) at $line105.$read.(:43) at $line105.$read$.(:47) at $line105.$read$.() at $line105.$eval$.(:7) at $line105.$eval$.() at $line105.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840