I used HCatalogIO: Map<String, String> configProperties = new HashMap<>(); configProperties.put("hive.metastore.uris", "....");
pipeline.apply(HCatalogIO.read() .withConfigProperties(configProperties) .withDatabase(DB_NAME) .withTable(TEST_TABLE)) .apply("to-string", MapElements.via(new SimpleFunction<HCatRecord, String>() { @Override public String apply(HCatRecord input) { return input.toString(); } })) .apply(TextIO.write().to("my-logs/output.txt").withoutSharding()); But I am getting this error: 20/02/11 07:49:24 INFO spark.SparkContext: Starting job: collect at BoundedDataset.java:93 Exception in thread "dag-scheduler-event-loop" java.lang.NoSuchMethodError: org.apache.hive.hcatalog.common.HCatUtil.getHiveMetastoreClient(Lorg/apache/hadoop/hive/conf/HiveConf;)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient; at org.apache.beam.sdk.io.hcatalog.HCatalogUtils.createMetaStoreClient(HCatalogUtils.java:42) at org.apache.beam.sdk.io.hcatalog.HCatalogIO$BoundedHCatalogSource.getEstimatedSizeBytes(HCatalogIO.java:323) at org.apache.beam.runners.spark.io.SourceRDD$Bounded.getPartitions(SourceRDD.java:104) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:240) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:238) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:238) at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:512) at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:461) at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:448) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:962) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 20/02/11 07:52:54 INFO util.JsonImpl: Shutting down HTTP client From: Gershi, Noam [ICG-IT] Sent: Tuesday, February 11, 2020 11:08 AM To: user@beam.apache.org Subject: RE: Apache Beam with Hive Thanx. Looks like Hcatalog could work. But - is the an example with ‘SELECT’ query? JdbcIO probably not good to me, since my spark cluster in already configured to work with Hive. So – When I am code Spark pipelines, I can write queries without the need to give the user/password. – I would like to have something similar in Apache Beam. From: [gmail.com] utkarsh.kh...@gmail.com<mailto:utkarsh.kh...@gmail.com> <utkarsh.kh...@gmail.com<mailto:utkarsh.kh...@gmail.com>> Sent: Tuesday, February 11, 2020 10:26 AM To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: Re: Apache Beam with Hive While I've not created Hive tables using this .. reading and writing worked very well using HCatalogIO. https://beam.apache.org/releases/javadoc/2.18.0/org/apache/beam/sdk/io/hcatalog/HCatalogIO.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.18.0_org_apache_beam_sdk_io_hcatalog_HCatalogIO.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=H-EFU9Q7yATgV6oX55NU_3j5EMt2X2k-9B3sJSMou9M&s=NsHbYvkYO2ZYHO43DxCjEdgW_eCBBZJZCMsdq_lZK4o&e=> On Tue, Feb 11, 2020 at 11:38 AM Jean-Baptiste Onofre <j...@nanthrax.net<mailto:j...@nanthrax.net>> wrote: Hi, If you are ok to use Hive via JDBC, you can use JdbcIO and see the example in the javadoc: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L82<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_sdks_java_io_jdbc_src_main_java_org_apache_beam_sdk_io_jdbc_JdbcIO.java-23L82&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=H-EFU9Q7yATgV6oX55NU_3j5EMt2X2k-9B3sJSMou9M&s=OMH2F23vKbWVyjdXOmsA2hg78Fmp-oxskiaqrnprcII&e=> Regards JB Le 11 févr. 2020 à 07:03, Gershi, Noam <noam.ger...@citi.com<mailto:noam.ger...@citi.com>> a écrit : Hi, I am searching for a detailed example how to use Apache Beam with Hive and/or Hcatalog? Creating tables, inserting data and fetching it… Noam Gershi Software Developer ICG Technology – TLV Lab T: +972 (3) 7405718 <image002.png>