[ https://issues.apache.org/jira/browse/STORM-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894017#comment-16894017 ]
Alex Parrill commented on STORM-3473: ------------------------------------- Found a fix. {{storm-hive}} depends on {{org.apache.hive.hcatalog:hive-hcatalog-core}} and {{hive-hcatalog-streaming}} version 2.8.5. Upgrading to version 3.1.1 of those jars (by excluding them in the {{storm-hive}} dependency and manually depending on new version of the jars) solves the issue and hive now sees the writes. > Hive can't read records written from HiveBolt > --------------------------------------------- > > Key: STORM-3473 > URL: https://issues.apache.org/jira/browse/STORM-3473 > Project: Apache Storm > Issue Type: Bug > Components: storm-hive > Affects Versions: 2.0.0 > Reporter: Alex Parrill > Priority: Major > > I'm trying to stream items from storm into hive using the HiveBolt, but Hive > does not seem to see the records at all. > Test program: > {code:java} > package com.datto.hivetest; > import org.apache.storm.Config; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.AlreadyAliveException; > import org.apache.storm.generated.AuthorizationException; > import org.apache.storm.generated.InvalidTopologyException; > import org.apache.storm.hive.bolt.HiveBolt; > import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper; > import org.apache.storm.hive.common.HiveOptions; > import org.apache.storm.spout.SpoutOutputCollector; > import org.apache.storm.streams.StreamBuilder; > import org.apache.storm.task.TopologyContext; > import org.apache.storm.topology.OutputFieldsDeclarer; > import org.apache.storm.topology.base.BaseRichSpout; > import org.apache.storm.tuple.Fields; > import org.apache.storm.tuple.Values; > import org.apache.storm.utils.Time; > import java.util.Map; > import java.util.Random; > public class MainStorm { > public static void main(String[] args) throws InvalidTopologyException, > AuthorizationException, AlreadyAliveException { > HiveOptions hiveOptions = new HiveOptions( > "<url>", > "default", > "test_table", > new JsonRecordHiveMapper() > .withColumnFields(new Fields("value")) > ) > .withAutoCreatePartitions(true); > StreamBuilder builder = new StreamBuilder(); > builder.newStream(new TestSpout()) > .map(tup -> tup.getStringByField("word").toLowerCase()) > .to(new HiveBolt(hiveOptions)); > Config config = new Config(); > config.setMessageTimeoutSecs(30); > config.setMaxSpoutPending(1024); > config.setClasspath("/etc/hadoop/conf/"); > StormSubmitter.submitTopology("hive-test", config, > builder.build()); > } > public static class TestSpout extends BaseRichSpout { > private transient SpoutOutputCollector out; > private transient Random random; > @Override > public void open(Map<String, Object> conf, TopologyContext > context, SpoutOutputCollector collector) { > out = collector; > random = new Random(); > } > @Override > public void nextTuple() { > try { > Time.sleep(100); > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > throw new RuntimeException(e); > } > final String[] words = new String[]{ "nathan", "mike", > "jackson", "golda", "bertels" }; > final String word = words[random.nextInt(words.length)]; > out.emit(new Values(word)); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("word")); > } > } > } > {code} > Table creation: > {code:sql} > CREATE TABLE test_table (value string) CLUSTERED BY (value) INTO 4 BUCKETS > STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB', 'transactional' = > 'true'); > GRANT ALL ON test_table TO USER storm;{code} > Setting the ACL: > {code} > sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx > /warehouse/tablespace/managed/hive/test_table > sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx > /warehouse/tablespace/managed/hive/test_table > {code} > Hive results after running for around 10 minutes: > {code:java} > > SELECT COUNT(*) FROM test_table; > INFO : Compiling > command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): > SELECT COUNT(*) FROM test_table > INFO : Semantic Analysis Completed (retrial = false) > INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, > type:bigint, comment:null)], properties:null) > INFO : Completed compiling > command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); > Time taken: 1.138 seconds > INFO : Executing > command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): > SELECT COUNT(*) FROM test_table > INFO : Completed executing > command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); > Time taken: 0.013 seconds > INFO : OK > +------+ > | _c0 | > +------+ > | 0 | > +------+ > {code} > So hive thinks there are no results, which isn't good. But if I look at hdfs, > there are some files there: > {code} > # sudo -u hdfs hdfs dfs -ls -R -h > /warehouse/tablespace/managed/hive/test_table > drwxrwx---+ - storm hadoop 0 2019-07-22 19:15 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100 > -rw-rw----+ 3 storm hadoop 1 2019-07-22 19:15 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/_orc_acid_version > -rw-rw----+ 3 storm hadoop 74.4 K 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001 > -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001_flush_length > -rw-rw----+ 3 storm hadoop 73.4 K 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002 > -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002_flush_length > -rw-rw----+ 3 storm hadoop 84.9 K 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003 > -rw-rw----+ 3 storm hadoop 376 2019-07-22 19:27 > /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003_flush_length > {code} > And they seem to have valid rows: > {code} > ❯❯❯ ./orc-contents /tmp/bucket_00002 | head > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 0, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 1, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 2, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 3, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 4, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 5, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 6, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 7, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 8, > "currentTransaction": 1, "row": {"value": "bertels"}} > {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 9, > "currentTransaction": 1, "row": {"value": "bertels"}} > {code} > I can insert into the table manually, and I've also written a test java > program that uses the hive streaming API to write one row, and hive sees > those inserts. I don't see any errors in the storm logs; the tuples seem to > be flushed and acked ok. I don't think I've seen any errors in the metastore > logs either. > Anyone know what's up? I can get more info if needed. -- This message was sent by Atlassian JIRA (v7.6.14#76016)