This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 381123e Call VerifySuccess before return to user (#151) (#157)
381123e is described below
commit 381123ebae95ecf6d6ae783ae4a1ff1087f2aaf2
Author: Haonan <[email protected]>
AuthorDate: Thu Mar 26 14:33:05 2026 +0800
Call VerifySuccess before return to user (#151) (#157)
* Move VerifySuccess
* fix missing code
* fix missing code
* fix copilot review
---
client/errors.go | 30 ++--
client/session.go | 252 ++++++++++++++++++---------
client/utils.go | 8 +-
example/session_example.go | 19 +-
example/session_pool/session_pool_example.go | 17 +-
test/e2e/e2e_test.go | 29 ++-
6 files changed, 211 insertions(+), 144 deletions(-)
diff --git a/client/errors.go b/client/errors.go
index 2c54bde..66ead4f 100644
--- a/client/errors.go
+++ b/client/errors.go
@@ -20,28 +20,34 @@
package client
import (
- "bytes"
+ "fmt"
+
"github.com/apache/iotdb-client-go/common"
)
+// ExecutionError represents an error returned by the server via TSStatus.
+// It is NOT a connection error and should not cause session drops.
+type ExecutionError struct {
+ Code int32
+ Message string
+}
+
+func (e *ExecutionError) Error() string {
+ if e.Message != "" {
+ return fmt.Sprintf("error code: %d, message: %v", e.Code,
e.Message)
+ }
+ return fmt.Sprintf("error code: %d", e.Code)
+}
+
type BatchError struct {
statuses []*common.TSStatus
+ Message string
}
func (e *BatchError) Error() string {
- buff := bytes.Buffer{}
- for _, status := range e.statuses {
- buff.WriteString(*status.Message + ";")
- }
- return buff.String()
+ return e.Message
}
func (e *BatchError) GetStatuses() []*common.TSStatus {
return e.statuses
}
-
-func NewBatchError(statuses []*common.TSStatus) *BatchError {
- return &BatchError{
- statuses: statuses,
- }
-}
diff --git a/client/session.go b/client/session.go
index fff4cf0..98a9fee 100644
--- a/client/session.go
+++ b/client/session.go
@@ -186,14 +186,17 @@ func (s *Session) Close() error {
*return
*error: correctness of operation
*/
-func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus,
err error) {
- r, err = s.client.SetStorageGroup(context.Background(), s.sessionId,
storageGroupId)
+func (s *Session) SetStorageGroup(storageGroupId string) error {
+ r, err := s.client.SetStorageGroup(context.Background(), s.sessionId,
storageGroupId)
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.SetStorageGroup(context.Background(),
s.sessionId, storageGroupId)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -203,14 +206,17 @@ func (s *Session) SetStorageGroup(storageGroupId string)
(r *common.TSStatus, er
*return
*error: correctness of operation
*/
-func (s *Session) DeleteStorageGroup(storageGroupId string) (r
*common.TSStatus, err error) {
- r, err = s.client.DeleteStorageGroups(context.Background(),
s.sessionId, []string{storageGroupId})
+func (s *Session) DeleteStorageGroup(storageGroupId string) error {
+ r, err := s.client.DeleteStorageGroups(context.Background(),
s.sessionId, []string{storageGroupId})
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteStorageGroups(context.Background(), s.sessionId,
[]string{storageGroupId})
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -220,14 +226,17 @@ func (s *Session) DeleteStorageGroup(storageGroupId
string) (r *common.TSStatus,
*return
*error: correctness of operation
*/
-func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r
*common.TSStatus, err error) {
- r, err = s.client.DeleteStorageGroups(context.Background(),
s.sessionId, storageGroupIds)
+func (s *Session) DeleteStorageGroups(storageGroupIds ...string) error {
+ r, err := s.client.DeleteStorageGroups(context.Background(),
s.sessionId, storageGroupIds)
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -240,9 +249,11 @@ func (s *Session) DeleteStorageGroups(storageGroupIds
...string) (r *common.TSSt
*return
*error: correctness of operation
*/
-func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags
map[string]string) (r *common.TSStatus, err error) {
- request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path:
path, DataType: int32(dataType), Encoding: int32(encoding),
- Compressor: int32(compressor), Attributes: attributes, Tags:
tags}
+func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags
map[string]string) error {
+ request := rpc.TSCreateTimeseriesReq{
+ SessionId: s.sessionId, Path: path, DataType: int32(dataType),
Encoding: int32(encoding),
+ Compressor: int32(compressor), Attributes: attributes, Tags:
tags,
+ }
status, err := s.client.CreateTimeseries(context.Background(), &request)
if err != nil && status == nil {
if s.reconnect() {
@@ -250,7 +261,10 @@ func (s *Session) CreateTimeseries(path string, dataType
TSDataType, encoding TS
status, err =
s.client.CreateTimeseries(context.Background(), &request)
}
}
- return status, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(status)
}
/*
@@ -265,7 +279,7 @@ func (s *Session) CreateTimeseries(path string, dataType
TSDataType, encoding TS
*return
*error: correctness of operation
*/
-func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors
[]TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error)
{
+func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors
[]TSCompressionType, measurementAlias []string) error {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
@@ -297,7 +311,10 @@ func (s *Session) CreateAlignedTimeseries(prefixPath
string, measurements []stri
status, err =
s.client.CreateAlignedTimeseries(context.Background(), &request)
}
}
- return status, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(status)
}
/*
@@ -310,7 +327,7 @@ func (s *Session) CreateAlignedTimeseries(prefixPath
string, measurements []stri
*return
*error: correctness of operation
*/
-func (s *Session) CreateMultiTimeseries(paths []string, dataTypes
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r
*common.TSStatus, err error) {
+func (s *Session) CreateMultiTimeseries(paths []string, dataTypes
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) error {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
@@ -326,9 +343,11 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
destCompressions[i] = int32(e)
}
- request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId,
Paths: paths, DataTypes: destTypes,
- Encodings: destEncodings, Compressors: destCompressions}
- r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+ request := rpc.TSCreateMultiTimeseriesReq{
+ SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
+ Encodings: destEncodings, Compressors: destCompressions,
+ }
+ r, err := s.client.CreateMultiTimeseries(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
@@ -337,7 +356,10 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -347,14 +369,17 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
*return
*error: correctness of operation
*/
-func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err
error) {
- r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId,
paths)
+func (s *Session) DeleteTimeseries(paths []string) error {
+ r, err := s.client.DeleteTimeseries(context.Background(), s.sessionId,
paths)
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -366,16 +391,19 @@ func (s *Session) DeleteTimeseries(paths []string) (r
*common.TSStatus, err erro
*return
*error: correctness of operation
*/
-func (s *Session) DeleteData(paths []string, startTime int64, endTime int64)
(r *common.TSStatus, err error) {
+func (s *Session) DeleteData(paths []string, startTime int64, endTime int64)
error {
request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths,
StartTime: startTime, EndTime: endTime}
- r, err = s.client.DeleteData(context.Background(), &request)
+ r, err := s.client.DeleteData(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.DeleteData(context.Background(),
&request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -388,17 +416,22 @@ func (s *Session) DeleteData(paths []string, startTime
int64, endTime int64) (r
*return
*error: correctness of operation
*/
-func (s *Session) InsertStringRecord(deviceId string, measurements []string,
values []string, timestamp int64) (r *common.TSStatus, err error) {
- request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId,
PrefixPath: deviceId, Measurements: measurements,
- Values: values, Timestamp: timestamp}
- r, err = s.client.InsertStringRecord(context.Background(), &request)
+func (s *Session) InsertStringRecord(deviceId string, measurements []string,
values []string, timestamp int64) error {
+ request := rpc.TSInsertStringRecordReq{
+ SessionId: s.sessionId, PrefixPath: deviceId, Measurements:
measurements,
+ Values: values, Timestamp: timestamp,
+ }
+ r, err := s.client.InsertStringRecord(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertStringRecord(context.Background(), &request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) GetTimeZone() (string, error) {
@@ -409,11 +442,17 @@ func (s *Session) GetTimeZone() (string, error) {
return resp.TimeZone, nil
}
-func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error)
{
+func (s *Session) SetTimeZone(timeZone string) error {
request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone:
timeZone}
- r, err = s.client.SetTimeZone(context.Background(), &request)
+ r, err := s.client.SetTimeZone(context.Background(), &request)
+ if err != nil {
+ return err
+ }
+ if err := VerifySuccess(r); err != nil {
+ return err
+ }
s.config.TimeZone = timeZone
- return r, err
+ return nil
}
func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string)
(*SessionDataSet, error) {
@@ -444,7 +483,7 @@ func (s *Session) ExecuteStatement(sql string)
(*SessionDataSet, error) {
return s.ExecuteStatementWithContext(context.Background(), sql)
}
-func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus,
err error) {
+func (s *Session) ExecuteNonQueryStatement(sql string) error {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId,
Statement: sql,
@@ -460,8 +499,10 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err
resp, err =
s.client.ExecuteStatementV2(context.Background(), &request)
}
}
-
- return resp.Status, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(resp.Status)
}
func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64)
(*SessionDataSet, error) {
@@ -560,12 +601,12 @@ func (s *Session) genTSInsertRecordReq(deviceId string,
time int64,
return request, nil
}
-func (s *Session) InsertRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) (r
*common.TSStatus, err error) {
+func (s *Session) InsertRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) error {
request, err := s.genTSInsertRecordReq(deviceId, timestamp,
measurements, dataTypes, values, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertRecord(context.Background(), request)
+ r, err := s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -574,15 +615,18 @@ func (s *Session) InsertRecord(deviceId string,
measurements []string, dataTypes
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) error {
request, err := s.genTSInsertRecordReq(deviceId, timestamp,
measurements, dataTypes, values, true)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertRecord(context.Background(), request)
+ r, err := s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -591,7 +635,10 @@ func (s *Session) InsertAlignedRecord(deviceId string,
measurements []string, da
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
type deviceData struct {
@@ -620,11 +667,11 @@ func (d *deviceData) Swap(i, j int) {
// InsertRecordsOfOneDevice Insert multiple rows, which can reduce the
overhead of network. This method is just like jdbc
// executeBatch, we pack some insert request in batch and send them to server.
If you want improve
// your performance, please see insertTablet method
-// Each row is independent, which could have different deviceId, time, number
of measurements
-func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
+// Each row is independent, which could have different insertTargetName, time,
number of measurements
+func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) error {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length ||
len(valuesSlice) != length {
- return nil, errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
+ return errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
}
if !sorted {
@@ -636,10 +683,11 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId
string, timestamps []int64,
})
}
+ var err error
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i],
valuesSlice[i], measurementsSlice[i]); err != nil {
- return nil, err
+ return err
}
}
@@ -651,7 +699,7 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string,
timestamps []int64,
ValuesList: valuesList,
}
- r, err = s.client.InsertRecordsOfOneDevice(context.Background(),
request)
+ r, err := s.client.InsertRecordsOfOneDevice(context.Background(),
request)
if err != nil && r == nil {
if s.reconnect() {
@@ -660,13 +708,16 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId
string, timestamps []int64,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
+func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) error {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length ||
len(valuesSlice) != length {
- return nil, errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
+ return errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
}
if !sorted {
@@ -678,10 +729,11 @@ func (s *Session)
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
})
}
+ var err error
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i],
valuesSlice[i], measurementsSlice[i]); err != nil {
- return nil, err
+ return err
}
}
var isAligned = true
@@ -694,7 +746,7 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId
string, timestamps []
IsAligned: &isAligned,
}
- r, err = s.client.InsertRecordsOfOneDevice(context.Background(),
request)
+ r, err := s.client.InsertRecordsOfOneDevice(context.Background(),
request)
if err != nil && r == nil {
if s.reconnect() {
@@ -703,7 +755,10 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId
string, timestamps []
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -719,36 +774,44 @@ func (s *Session)
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
*
*/
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string,
dataTypes [][]TSDataType, values [][]interface{},
- timestamps []int64) (r *common.TSStatus, err error) {
+ timestamps []int64,
+) error {
request, err := s.genInsertRecordsReq(deviceIds, measurements,
dataTypes, values, timestamps, false)
if err != nil {
- return nil, err
+ return err
} else {
- r, err = s.client.InsertRecords(context.Background(), request)
+ r, err := s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertRecords(context.Background(), request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
}
func (s *Session) InsertAlignedRecords(deviceIds []string, measurements
[][]string, dataTypes [][]TSDataType, values [][]interface{},
- timestamps []int64) (r *common.TSStatus, err error) {
+ timestamps []int64,
+) error {
request, err := s.genInsertRecordsReq(deviceIds, measurements,
dataTypes, values, timestamps, true)
if err != nil {
- return nil, err
+ return err
} else {
- r, err = s.client.InsertRecords(context.Background(), request)
+ r, err := s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertRecords(context.Background(), request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
}
@@ -757,63 +820,72 @@ func (s *Session) InsertAlignedRecords(deviceIds
[]string, measurements [][]stri
*params
*tablets: []*client.Tablet, list of tablets
*/
-func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) error {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
- return nil, err
+ return err
}
}
}
request, err := s.genInsertTabletsReq(tablets, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablets(context.Background(), request)
+ r, err := s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(),
request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) error {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
- return nil, err
+ return err
}
}
}
request, err := s.genInsertTabletsReq(tablets, true)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablets(context.Background(), request)
+ r, err := s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(),
request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus,
err error) {
+func (s *Session) ExecuteBatchStatement(inserts []string) error {
request := rpc.TSExecuteBatchStatementReq{
SessionId: s.sessionId,
Statements: inserts,
}
- r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+ r, err := s.client.ExecuteBatchStatement(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.ExecuteBatchStatement(context.Background(), &request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime
int64) (*SessionDataSet, error) {
@@ -1020,18 +1092,18 @@ func valuesToBytes(dataTypes []TSDataType, values
[]interface{}, measurementName
return buff.Bytes(), nil
}
-func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertTablet(tablet *Tablet, sorted bool) error {
if !sorted {
if err := tablet.Sort(); err != nil {
- return nil, err
+ return err
}
}
request, err := s.genTSInsertTabletReq(tablet, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablet(context.Background(), request)
+ r, err := s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -1040,21 +1112,24 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted
bool) (r *common.TSStatus,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) error {
if !sorted {
if err := tablet.Sort(); err != nil {
- return nil, err
+ return err
}
}
request, err := s.genTSInsertTabletReq(tablet, true)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablet(context.Background(), request)
+ r, err := s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -1063,7 +1138,10 @@ func (s *Session) InsertAlignedTablet(tablet *Tablet,
sorted bool) (r *common.TS
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool)
(*rpc.TSInsertTabletReq, error) {
diff --git a/client/utils.go b/client/utils.go
index 41cc783..0f05dac 100644
--- a/client/utils.go
+++ b/client/utils.go
@@ -124,7 +124,7 @@ func verifySuccesses(statuses []*common.TSStatus) error {
}
errMsg := buff.String()
if len(errMsg) > 0 {
- return NewBatchError(statuses)
+ return &BatchError{statuses, errMsg}
}
return nil
}
@@ -141,11 +141,11 @@ func VerifySuccess(status *common.TSStatus) error {
return nil
}
if status.Code != SuccessStatus {
+ msg := ""
if status.Message != nil {
- return fmt.Errorf("error code: %d, message: %v",
status.Code, *status.Message)
- } else {
- return fmt.Errorf("error code: %d", status.Code)
+ msg = *status.Message
}
+ return &ExecutionError{Code: status.Code, Message: msg}
}
return nil
}
diff --git a/example/session_example.go b/example/session_example.go
index 1b31514..c5d1397 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -22,12 +22,13 @@ package main
import (
"flag"
"fmt"
- "github.com/apache/iotdb-client-go/common"
"log"
"math/rand"
"strings"
"time"
+ "github.com/apache/iotdb-client-go/common"
+
"github.com/apache/iotdb-client-go/client"
)
@@ -465,9 +466,9 @@ func deleteData() {
func insertTablet() {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertTablet(tablet, false)
+ err = session.InsertTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -475,9 +476,9 @@ func insertTablet() {
func insertAlignedTablet() {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertAlignedTablet(tablet, false)
+ err = session.InsertAlignedTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -642,14 +643,8 @@ func executeBatchStatement() {
}
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/example/session_pool/session_pool_example.go
b/example/session_pool/session_pool_example.go
index 6f52ffe..2dfb5bb 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -22,7 +22,6 @@ package main
import (
"flag"
"fmt"
- "github.com/apache/iotdb-client-go/common"
"log"
"math/rand"
"strings"
@@ -410,9 +409,9 @@ func insertTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertTablet(tablet, false)
+ err := session.InsertTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -425,9 +424,9 @@ func insertAlignedTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertAlignedTablet(tablet,
false)
+ err := session.InsertAlignedTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -725,14 +724,8 @@ func printDataSet2(sds *client.SessionDataSet) {
}
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index a225546..be83bbf 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -28,8 +28,6 @@ import (
"testing"
"time"
- "github.com/apache/iotdb-client-go/common"
-
"github.com/apache/iotdb-client-go/client"
"github.com/stretchr/testify/suite"
)
@@ -61,20 +59,17 @@ func (s *e2eTestSuite) TearDownSuite() {
}
func (s *e2eTestSuite) SetupTest() {
- r, err := s.session.SetStorageGroup("root.tsg1")
- s.checkError(r, err)
+ err := s.session.SetStorageGroup("root.tsg1")
+ s.checkError(err)
}
func (s *e2eTestSuite) TearDownTest() {
- r, err := s.session.DeleteStorageGroup("root.tsg1")
- s.checkError(r, err)
+ err := s.session.DeleteStorageGroup("root.tsg1")
+ s.checkError(err)
}
-func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) {
+func (s *e2eTestSuite) checkError(err error) {
s.Require().NoError(err)
- if status != nil {
- s.Require().NoError(client.VerifySuccess(status))
- }
}
func (s *e2eTestSuite) Test_NonQuery() {
@@ -174,7 +169,7 @@ func (s *e2eTestSuite) Test_InsertRecordsWithWrongType() {
values = [][]interface{}{{100.0, true}, {"aaa"}}
timestamp = []int64{1, 2}
)
- _, err := s.session.InsertRecords(deviceId, measurements, dataTypes,
values, timestamp)
+ err := s.session.InsertRecords(deviceId, measurements, dataTypes,
values, timestamp)
assert := s.Require()
assert.NotNil(err)
assert.Equal("measurement s1 values[0] 100(float64) must be bool",
err.Error())
@@ -255,8 +250,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTablet(12); err == nil {
- status, err := s.session.InsertAlignedTablet(tablet, false)
- s.checkError(status, err)
+ err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
@@ -277,8 +272,8 @@ func (s *e2eTestSuite)
Test_InsertAlignedTabletWithNilValue() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTabletWithNil(12); err == nil {
- status, err := s.session.InsertAlignedTablet(tablet, false)
- s.checkError(status, err)
+ err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
@@ -499,8 +494,8 @@ func (s *e2eTestSuite) Test_QueryAllDataType() {
tablet.SetValueAt("string", 9, 0)
tablet.RowSize = 1
- r, err := s.session.InsertAlignedTablet(tablet, true)
- s.checkError(r, err)
+ err = s.session.InsertAlignedTablet(tablet, true)
+ s.checkError(err)
sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1,
s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil)
for {