Forgot the java source code. Attached.
Sincerely,
Alex Parrill

On Wed, Jul 24, 2019 at 10:17 AM Alexander Parrill <[email protected]>
wrote:

> I'm trying to use the Hive streaming API to put rows into Hive. I see that
> there are delta files created with the information, but when using SQL,
> Hive does not return any of them.
>
> I made test cases using the old and new streaming APIs, both have the same
> issue.
>
> Table creation:
>
>     CREATE TABLE test_streaming_v1 (value string) CLUSTERED BY (value)
> INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB',
> 'transactional' = 'true');
>     GRANT ALL ON test_streaming_v1 TO USER storm;
>     sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx
> /warehouse/tablespace/managed/hive/test_streaming_v1
>     sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx
> /warehouse/tablespace/managed/hive/test_streaming_v1
>     # repeat for v2
>
> After running, `SELECT COUNT(*)` from both tables returns 0. However,
> there are obviously delta files in HDFS:
>
>     # sudo -u hdfs hdfs dfs -ls -R -h
> /warehouse/tablespace/managed/hive/test_streaming_v1
>     drwxrwx---+  - storm hadoop          0 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010
>     -rw-rw----+  3 storm hadoop          1 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/_orc_acid_version
>     -rw-rw----+  3 storm hadoop      1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00000
>     -rw-rw----+  3 storm hadoop      1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00001
>     -rw-rw----+  3 storm hadoop      1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00002
>     -rw-rw----+  3 storm hadoop      1.1 K 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v1/delta_0000001_0000010/bucket_00003
>     # sudo -u hdfs hdfs dfs -ls -R -h
> /warehouse/tablespace/managed/hive/test_streaming_v2
>     drwxrwx---+  - storm hadoop          0 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001
>     -rw-rw----+  3 storm hadoop          1 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/_orc_acid_version
>     -rw-rw----+  3 storm hadoop        974 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00000
>     -rw-rw----+  3 storm hadoop        989 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00001
>     -rw-rw----+  3 storm hadoop        983 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00002
>     -rw-rw----+  3 storm hadoop        997 2019-07-24 13:59
> /warehouse/tablespace/managed/hive/test_streaming_v2/delta_0000001_0000001/bucket_00003
>
> And if I examine the file contents, it looks more or less correct:
>
>     > ./orc-contents /tmp/bucket_00000 | head
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 0, "currentTransaction": 1, "row": {"value": "3"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 1, "currentTransaction": 1, "row": {"value": "10"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 2, "currentTransaction": 1, "row": {"value": "15"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 3, "currentTransaction": 1, "row": {"value": "18"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 4, "currentTransaction": 1, "row": {"value": "21"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 5, "currentTransaction": 1, "row": {"value": "24"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 6, "currentTransaction": 1, "row": {"value": "36"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 7, "currentTransaction": 1, "row": {"value": "37"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 8, "currentTransaction": 1, "row": {"value": "46"}}
>     {"operation": 0, "originalTransaction": 1, "bucket": 536870912,
> "rowId": 9, "currentTransaction": 1, "row": {"value": "48"}}
>
> I've verified that hive can read the buckets even though the storm user
> owns them.
> Java source and logs from running them are attached.
> Hive Server 3.1.0
> Hive JARs 3.1.1
>
> Anyone have any ideas what's going wrong and how to fix it?
>
> Sincerely,
> Alex Parrill
>
package com.datto.hivetest;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StrictJsonWriter;
import picocli.CommandLine;

import java.util.Collections;
import java.util.concurrent.Callable;

@CommandLine.Command(name = "Stream using Hive Stream API V2", mixinStandardHelpOptions = true)
public class MainV2 implements Callable<Integer> {
	@CommandLine.Option(names={"-t", "--table"}, description = "Table to write to")
	private String table = "test_table2";

	@CommandLine.Option(names={"-i", "--iterations"}, description = "Number of rows to insert in one transaction")
	private int iterations = 50;

	@Override
	public Integer call() throws Exception {
		HiveConf conf = new HiveConf();
		System.out.println("Connecting");
		StreamingConnection connection = HiveStreamingConnection.newBuilder()
			.withDatabase("default")
			.withTable(table)
			.withStaticPartitionValues(Collections.emptyList())
			.withAgentInfo("hive-test-thing")
			.withRecordWriter(StrictJsonWriter.newBuilder().build())
			.withHiveConf(conf)
			.connect();

		System.out.println("Beginning Transaction");
		connection.beginTransaction();
		System.out.println("Writing");
		for(int i=0; i<iterations; i++) {
			connection.write(String.format("{\"value\": \"%d\"}", i).getBytes());
		}
		System.out.println("Committing Transaction");
		connection.commitTransaction();
		System.out.println("Closing");
		connection.close();
		return null;
	}

	public static void main(String[] args) {
		Integer returnCode = CommandLine.call(new MainV2(), System.err, args);
		if (returnCode != null) {
			System.exit(returnCode);
		}
	}
}
package com.datto.hivetest;

import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import picocli.CommandLine;

import java.util.Collections;
import java.util.concurrent.Callable;

@CommandLine.Command(name = "Stream using Hive Stream API V1", mixinStandardHelpOptions = true)
public class MainV1 implements Callable<Integer> {

	@CommandLine.Option(names={"-t", "--table"}, description = "Table to write to")
	private String table = "test_table2";

	@CommandLine.Option(names={"-i", "--iterations"}, description = "Number of rows to insert in one transaction")
	private int iterations = 50;


	public Integer call() throws Exception {
		System.out.println("Initializing");
		HiveEndPoint ep = new HiveEndPoint(
			"thrift://use1-hadoop-3.datto.lan:9083,thrift://use1-hadoop-5.datto.lan:9083",
			"default",
			table,
			Collections.emptyList()
		);
		StreamingConnection conn = ep.newConnection(true);
		TransactionBatch batch = conn.fetchTransactionBatch(10, new StrictJsonWriter(ep));
		System.out.println("Writing");
		batch.beginNextTransaction();
		for(int i=0; i<iterations; i++) {
			batch.write(String.format("{\"value\": \"%d\"}", i).getBytes());
		}
		batch.commit();
		System.out.println("Closing");
		batch.close();
		conn.close();
		return null;
	}

	public static void main(String[] args) {
		Integer returnCode = CommandLine.call(new MainV1(), System.err, args);
		if (returnCode != null) {
			System.exit(returnCode);
		}
	}
}

Reply via email to