[ 
https://issues.apache.org/jira/browse/STORM-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894017#comment-16894017
 ] 

Alex Parrill commented on STORM-3473:
-------------------------------------

Found a fix. {{storm-hive}} depends on 
{{org.apache.hive.hcatalog:hive-hcatalog-core}} and {{hive-hcatalog-streaming}} 
version 2.8.5. Upgrading to version 3.1.1 of those jars (by excluding them in 
the {{storm-hive}} dependency and manually depending on new version of the 
jars) solves the issue and hive now sees the writes.

> Hive can't read records written from HiveBolt
> ---------------------------------------------
>
>                 Key: STORM-3473
>                 URL: https://issues.apache.org/jira/browse/STORM-3473
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-hive
>    Affects Versions: 2.0.0
>            Reporter: Alex Parrill
>            Priority: Major
>
> I'm trying to stream items from storm into hive using the HiveBolt, but Hive 
> does not seem to see the records at all.
> Test program:
> {code:java}
> package com.datto.hivetest;
> import org.apache.storm.Config;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.AlreadyAliveException;
> import org.apache.storm.generated.AuthorizationException;
> import org.apache.storm.generated.InvalidTopologyException;
> import org.apache.storm.hive.bolt.HiveBolt;
> import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
> import org.apache.storm.hive.common.HiveOptions;
> import org.apache.storm.spout.SpoutOutputCollector;
> import org.apache.storm.streams.StreamBuilder;
> import org.apache.storm.task.TopologyContext;
> import org.apache.storm.topology.OutputFieldsDeclarer;
> import org.apache.storm.topology.base.BaseRichSpout;
> import org.apache.storm.tuple.Fields;
> import org.apache.storm.tuple.Values;
> import org.apache.storm.utils.Time;
> import java.util.Map;
> import java.util.Random;
> public class MainStorm {
>       public static void main(String[] args) throws InvalidTopologyException, 
> AuthorizationException, AlreadyAliveException {
>               HiveOptions hiveOptions = new HiveOptions(
>                       "<url>",
>                       "default",
>                       "test_table",
>                       new JsonRecordHiveMapper()
>                               .withColumnFields(new Fields("value"))
>               )
>                       .withAutoCreatePartitions(true);
>               StreamBuilder builder = new StreamBuilder();
>               builder.newStream(new TestSpout())
>                       .map(tup -> tup.getStringByField("word").toLowerCase())
>                       .to(new HiveBolt(hiveOptions));
>               Config config = new Config();
>               config.setMessageTimeoutSecs(30);
>               config.setMaxSpoutPending(1024);
>               config.setClasspath("/etc/hadoop/conf/");
>               StormSubmitter.submitTopology("hive-test", config, 
> builder.build());
>       }
>       public static class TestSpout extends BaseRichSpout {
>               private transient SpoutOutputCollector out;
>               private transient Random random;
>               @Override
>               public void open(Map<String, Object> conf, TopologyContext 
> context, SpoutOutputCollector collector) {
>                       out = collector;
>                       random = new Random();
>               }
>               @Override
>               public void nextTuple() {
>                       try {
>                               Time.sleep(100);
>                       } catch (InterruptedException e) {
>                               Thread.currentThread().interrupt();
>                               throw new RuntimeException(e);
>                       }
>                       final String[] words = new String[]{ "nathan", "mike", 
> "jackson", "golda", "bertels" };
>                       final String word = words[random.nextInt(words.length)];
>                       out.emit(new Values(word));
>               }
>               @Override
>               public void declareOutputFields(OutputFieldsDeclarer declarer) {
>                       declarer.declare(new Fields("word"));
>               }
>       }
> }
> {code}
> Table creation:
> {code:sql}
> CREATE TABLE test_table (value string) CLUSTERED BY (value) INTO 4 BUCKETS 
> STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB', 'transactional' = 
> 'true');
> GRANT ALL ON test_table TO USER storm;{code}
> Setting the ACL:
> {code}
> sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx 
> /warehouse/tablespace/managed/hive/test_table
> sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx 
> /warehouse/tablespace/managed/hive/test_table
> {code}
> Hive results after running for around 10 minutes:
> {code:java}
> > SELECT COUNT(*) FROM test_table;
> INFO  : Compiling 
> command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): 
> SELECT COUNT(*) FROM test_table
> INFO  : Semantic Analysis Completed (retrial = false)
> INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, 
> type:bigint, comment:null)], properties:null)
> INFO  : Completed compiling 
> command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); 
> Time taken: 1.138 seconds
> INFO  : Executing 
> command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): 
> SELECT COUNT(*) FROM test_table
> INFO  : Completed executing 
> command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); 
> Time taken: 0.013 seconds
> INFO  : OK
> +------+
> | _c0  |
> +------+
> | 0    |
> +------+
> {code}
> So hive thinks there are no results, which isn't good. But if I look at hdfs, 
> there are some files there:
> {code}
> # sudo -u hdfs hdfs dfs -ls -R -h 
> /warehouse/tablespace/managed/hive/test_table
> drwxrwx---+  - storm hadoop          0 2019-07-22 19:15 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100
> -rw-rw----+  3 storm hadoop          1 2019-07-22 19:15 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/_orc_acid_version
> -rw-rw----+  3 storm hadoop     74.4 K 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001
> -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001_flush_length
> -rw-rw----+  3 storm hadoop     73.4 K 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002
> -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002_flush_length
> -rw-rw----+  3 storm hadoop     84.9 K 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003
> -rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
> /warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003_flush_length
> {code}
> And they seem to have valid rows:
> {code}
> ❯❯❯ ./orc-contents /tmp/bucket_00002  | head
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 0, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 1, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 2, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 3, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 4, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 5, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 6, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 7, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 8, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 9, 
> "currentTransaction": 1, "row": {"value": "bertels"}}
> {code}
> I can insert into the table manually, and I've also written a test java 
> program that uses the hive streaming API to write one row, and hive sees 
> those inserts. I don't see any errors in the storm logs; the tuples seem to 
> be flushed and acked ok. I don't think I've seen any errors in the metastore 
> logs either.
> Anyone know what's up? I can get more info if needed.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to