Forgot the java source code. Attached.
Sincerely,
Alex Parrill
On Wed, Jul 24, 2019 at 10:17 AM Alexander Parrill <[email protected]>
wrote:
> I'm trying to use the Hive streaming API to put rows into Hive. I see that
> there are delta files created with the information, but when using SQL,
> Hive does not return any of them.
>
> I made test cases using the old and new streaming APIs, both have the same
> issue.
>
> Table creation:
>
> CREATE TABLE test_streaming_v1 (value string) CLUSTERED BY (value)
> INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB',
> 'transactional' = 'true');
> GRANT ALL ON test_streaming_v1 TO USER storm;
> sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx
> /warehouse/tablespace/managed/hive/test_streaming_v1
> sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx
> /warehouse/tablespace/managed/hive/test_streaming_v1
> # repeat for v2
>
> After running, `SELECT COUNT(*)` from both tables returns 0. However,
> there are obviously delta files in HDFS:
>
> # sudo -u hdfs hdfs dfs -ls -R -h
> /warehouse/tablespace/managed/hive/test_streaming_v1
> drwxrwx---+ - storm hadoop 0 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010
> -rw-rw----+ 3 storm hadoop 1 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/_orc_acid_version
> -rw-rw----+ 3 storm hadoop 1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00000
> -rw-rw----+ 3 storm hadoop 1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00001
> -rw-rw----+ 3 storm hadoop 1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00002
> -rw-rw----+ 3 storm hadoop 1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00003
> # sudo -u hdfs hdfs dfs -ls -R -h
> /warehouse/tablespace/managed/hive/test_streaming_v2
> drwxrwx---+ - storm hadoop 0 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001
> -rw-rw----+ 3 storm hadoop 1 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/_orc_acid_version
> -rw-rw----+ 3 storm hadoop 974 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00000
> -rw-rw----+ 3 storm hadoop 989 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00001
> -rw-rw----+ 3 storm hadoop 983 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00002
> -rw-rw----+ 3 storm hadoop 997 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00003
>
> And if I examine the file contents, it looks more or less correct:
>
> > ./orc-contents /tmp/bucket_00000 | head
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 0, "currentTransaction": 1, "row": {"value": "3"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 1, "currentTransaction": 1, "row": {"value": "10"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 2, "currentTransaction": 1, "row": {"value": "15"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 3, "currentTransaction": 1, "row": {"value": "18"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 4, "currentTransaction": 1, "row": {"value": "21"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 5, "currentTransaction": 1, "row": {"value": "24"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 6, "currentTransaction": 1, "row": {"value": "36"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 7, "currentTransaction": 1, "row": {"value": "37"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 8, "currentTransaction": 1, "row": {"value": "46"}}
> {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 9, "currentTransaction": 1, "row": {"value": "48"}}
>
> I've verified that hive can read the buckets even though the storm user
> owns them.
> Java source and logs from running them are attached.
> Hive Server 3.1.0
> Hive JARs 3.1.1
>
> Anyone have any ideas what's going wrong and how to fix it?
>
> Sincerely,
> Alex Parrill
>
package com.datto.hivetest;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StrictJsonWriter;
import picocli.CommandLine;
import java.util.Collections;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "Stream using Hive Stream API V2", mixinStandardHelpOptions = true)
public class MainV2 implements Callable<Integer> {
@CommandLine.Option(names={"-t", "--table"}, description = "Table to write to")
private String table = "test_table2";
@CommandLine.Option(names={"-i", "--iterations"}, description = "Number of rows to insert in one transaction")
private int iterations = 50;
@Override
public Integer call() throws Exception {
HiveConf conf = new HiveConf();
System.out.println("Connecting");
StreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("default")
.withTable(table)
.withStaticPartitionValues(Collections.emptyList())
.withAgentInfo("hive-test-thing")
.withRecordWriter(StrictJsonWriter.newBuilder().build())
.withHiveConf(conf)
.connect();
System.out.println("Beginning Transaction");
connection.beginTransaction();
System.out.println("Writing");
for(int i=0; i<iterations; i++) {
connection.write(String.format("{\"value\": \"%d\"}", i).getBytes());
}
System.out.println("Committing Transaction");
connection.commitTransaction();
System.out.println("Closing");
connection.close();
return null;
}
public static void main(String[] args) {
Integer returnCode = CommandLine.call(new MainV2(), System.err, args);
if (returnCode != null) {
System.exit(returnCode);
}
}
}
package com.datto.hivetest;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import picocli.CommandLine;
import java.util.Collections;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "Stream using Hive Stream API V1", mixinStandardHelpOptions = true)
public class MainV1 implements Callable<Integer> {
@CommandLine.Option(names={"-t", "--table"}, description = "Table to write to")
private String table = "test_table2";
@CommandLine.Option(names={"-i", "--iterations"}, description = "Number of rows to insert in one transaction")
private int iterations = 50;
public Integer call() throws Exception {
System.out.println("Initializing");
HiveEndPoint ep = new HiveEndPoint(
"thrift://use1-hadoop-3.datto.lan:9083,thrift://use1-hadoop-5.datto.lan:9083",
"default",
table,
Collections.emptyList()
);
StreamingConnection conn = ep.newConnection(true);
TransactionBatch batch = conn.fetchTransactionBatch(10, new StrictJsonWriter(ep));
System.out.println("Writing");
batch.beginNextTransaction();
for(int i=0; i<iterations; i++) {
batch.write(String.format("{\"value\": \"%d\"}", i).getBytes());
}
batch.commit();
System.out.println("Closing");
batch.close();
conn.close();
return null;
}
public static void main(String[] args) {
Integer returnCode = CommandLine.call(new MainV1(), System.err, args);
if (returnCode != null) {
System.exit(returnCode);
}
}
}