joao-r-reis commented on code in PR #1868:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1868#discussion_r2044793118
##########
session.go:
##########
@@ -743,33 +736,47 @@ func (s *Session) routingKeyInfo(ctx context.Context,
stmt string, keyspace stri
return routingKeyInfo, nil
}
-func (b *Batch) execute(ctx context.Context, conn *Conn) *Iter {
- return conn.executeBatch(ctx, b)
-}
-
// Exec executes a batch operation and returns nil if successful
// otherwise an error is returned describing the failure.
func (b *Batch) Exec() error {
- iter := b.session.executeBatch(b)
+ iter := b.session.executeBatch(b, nil)
+ return iter.Close()
+}
+
+// ExecContext executes a batch operation with the provided context and
returns nil if successful
+// otherwise an error is returned describing the failure.
+func (b *Batch) ExecContext(ctx context.Context) error {
+ iter := b.session.executeBatch(b, ctx)
return iter.Close()
}
-func (s *Session) executeBatch(batch *Batch) *Iter {
+// Iter executes a batch operation and returns an Iter object
+// that can be used to access properties related to the execution like
Iter.Attempts and Iter.Latency
+func (b *Batch) Iter() *Iter { return b.session.executeBatch(b, nil) }
+
+// Iter executes a batch operation with the provided context and returns an
Iter object
+// that can be used to access properties related to the execution like
Iter.Attempts and Iter.Latency
+func (b *Batch) IterContext(ctx context.Context) *Iter {
+ return b.session.executeBatch(b, ctx)
+}
Review Comment:
done
##########
query_executor.go:
##########
@@ -222,16 +238,452 @@ func (q *queryExecutor) do(ctx context.Context, qry
ExecutableQuery, hostIter Ne
}
if lastErr != nil {
- return &Iter{err: lastErr}
+ return newErrIter(lastErr, qry.getQueryMetrics(),
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
}
- return &Iter{err: ErrNoConnections}
+ return newErrIter(ErrNoConnections, qry.getQueryMetrics(),
qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
}
-func (q *queryExecutor) run(ctx context.Context, qry ExecutableQuery, hostIter
NextHost, results chan<- *Iter) {
+func (q *queryExecutor) run(ctx context.Context, qry internalRequest, hostIter
NextHost, results chan<- *Iter) {
select {
case results <- q.do(ctx, qry, hostIter):
case <-ctx.Done():
}
- qry.releaseAfterExecution()
+}
+
+type queryOptions struct {
+ stmt string
+
+ // Paging
+ pageSize int
+ disableAutoPage bool
+
+ // Monitoring
+ trace Tracer
+ observer QueryObserver
+
+ // Parameters
+ values []interface{}
+ binding func(q *QueryInfo) ([]interface{}, error)
+
+ // Timestamp
+ defaultTimestamp bool
+ defaultTimestampValue int64
+
+ // Consistency
+ serialCons SerialConsistency
+
+ // Protocol flag
+ disableSkipMetadata bool
+
+ customPayload map[string][]byte
+ prefetch float64
+ rt RetryPolicy
+ spec SpeculativeExecutionPolicy
+ context context.Context
+ idempotent bool
+ keyspace string
+ skipPrepare bool
+ routingKey []byte
+ nowInSecondsValue *int
+ hostID string
+
+ // getKeyspace is field so that it can be overriden in tests
+ getKeyspace func() string
+}
+
+func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
+ var newPageState, newRoutingKey []byte
+ if q.initialPageState != nil {
+ pageState := q.initialPageState
+ newPageState = make([]byte, len(pageState))
+ copy(newPageState, pageState)
+ }
+ if q.routingKey != nil {
+ routingKey := q.routingKey
+ newRoutingKey = make([]byte, len(routingKey))
+ copy(newRoutingKey, routingKey)
+ }
+ if ctx == nil {
+ ctx = q.Context()
+ }
+ return &queryOptions{
+ stmt: q.stmt,
+ values: q.values,
+ pageSize: q.pageSize,
+ prefetch: q.prefetch,
+ trace: q.trace,
+ observer: q.observer,
+ rt: q.rt,
+ spec: q.spec,
+ binding: q.binding,
+ serialCons: q.serialCons,
+ defaultTimestamp: q.defaultTimestamp,
+ defaultTimestampValue: q.defaultTimestampValue,
+ disableSkipMetadata: q.disableSkipMetadata,
+ context: ctx,
+ idempotent: q.idempotent,
+ customPayload: q.customPayload,
+ disableAutoPage: q.disableAutoPage,
+ skipPrepare: q.skipPrepare,
+ routingKey: newRoutingKey,
+ getKeyspace: q.getKeyspace,
+ nowInSecondsValue: q.nowInSecondsValue,
+ keyspace: q.keyspace,
+ hostID: q.hostID,
+ }
+}
+
+type internalQuery struct {
+ originalQuery *Query
+ qryOpts *queryOptions
+ pageState []byte
+ metrics *queryMetrics
+ refCount uint32
+ conn *Conn
+ consistency uint32
+ session *Session
+ routingInfo *queryRoutingInfo
+}
+
+func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
+ var newPageState []byte
+ if q.initialPageState != nil {
+ pageState := q.initialPageState
+ newPageState = make([]byte, len(pageState))
+ copy(newPageState, pageState)
+ }
+ return &internalQuery{
+ originalQuery: q,
+ qryOpts: newQueryOptions(q, ctx),
+ metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
+ consistency: uint32(q.initialConsistency),
+ pageState: newPageState,
+ conn: nil,
+ session: q.session,
+ routingInfo: &queryRoutingInfo{},
+ }
+}
+
+// Attempts returns the number of times the query was executed.
+func (q *internalQuery) Attempts() int {
+ return q.metrics.attempts()
+}
+
+func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter
*Iter, host *HostInfo) {
+ latency := end.Sub(start)
+ attempt, metricsForHost := q.metrics.attempt(1, latency, host,
q.qryOpts.observer != nil)
+
+ if q.qryOpts.observer != nil {
+ q.qryOpts.observer.ObserveQuery(q.qryOpts.context,
ObservedQuery{
+ Keyspace: keyspace,
+ Statement: q.qryOpts.stmt,
+ Values: q.qryOpts.values,
+ Start: start,
+ End: end,
+ Rows: iter.numRows,
+ Host: host,
+ Metrics: metricsForHost,
+ Err: iter.err,
+ Attempt: attempt,
+ Query: q.originalQuery,
+ })
+ }
+}
+
+func (q *internalQuery) execute(ctx context.Context, conn *Conn) *Iter {
+ return conn.executeQuery(ctx, q)
+}
+
+func (q *internalQuery) retryPolicy() RetryPolicy {
+ return q.qryOpts.rt
+}
+
+func (q *internalQuery) speculativeExecutionPolicy()
SpeculativeExecutionPolicy {
+ return q.qryOpts.spec
+}
+
+func (q *internalQuery) GetRoutingKey() ([]byte, error) {
+ if q.qryOpts.routingKey != nil {
+ return q.qryOpts.routingKey, nil
+ }
+
+ if q.qryOpts.binding != nil && len(q.qryOpts.values) == 0 {
+ // If this query was created using session.Bind we wont have
the query
+ // values yet, so we have to pass down to the next policy.
+ // TODO: Remove this and handle this case
+ return nil, nil
+ }
+
+ // try to determine the routing key
+ routingKeyInfo, err := q.session.routingKeyInfo(q.Context(),
q.qryOpts.stmt, q.qryOpts.keyspace)
+ if err != nil {
+ return nil, err
+ }
+
+ if routingKeyInfo != nil {
+ q.routingInfo.mu.Lock()
+ q.routingInfo.keyspace = routingKeyInfo.keyspace
+ q.routingInfo.table = routingKeyInfo.table
+ q.routingInfo.mu.Unlock()
+ }
+ return createRoutingKey(routingKeyInfo, q.qryOpts.values)
+}
+
+func (q *internalQuery) Keyspace() string {
+ if q.qryOpts.getKeyspace != nil {
+ return q.qryOpts.getKeyspace()
+ }
+
+ qrKs := q.routingInfo.getKeyspace()
+ if qrKs != "" {
+ return qrKs
+ }
+ if q.qryOpts.keyspace != "" {
+ return q.qryOpts.keyspace
+ }
+
+ if q.session == nil {
+ return ""
+ }
+ // TODO(chbannis): this should be parsed from the query or we should let
+ // this be set by users.
+ return q.session.cfg.Keyspace
+}
+
+func (q *internalQuery) Table() string {
+ return q.routingInfo.getTable()
+}
+
+func (q *internalQuery) IsIdempotent() bool {
+ return q.qryOpts.idempotent
+}
+
+func (q *internalQuery) getQueryMetrics() *queryMetrics {
+ return q.metrics
+}
+
+func (q *internalQuery) SetConsistency(c Consistency) {
+ atomic.StoreUint32(&q.consistency, uint32(c))
+}
+
+func (q *internalQuery) GetConsistency() Consistency {
+ return Consistency(atomic.LoadUint32(&q.consistency))
+}
+
+func (q *internalQuery) Context() context.Context {
+ return q.qryOpts.context
+}
+
+func (q *internalQuery) Statement() Statement {
+ return q.originalQuery
+}
+
+func (q *internalQuery) GetHostID() string {
+ return q.qryOpts.hostID
+}
+
+func (q *internalQuery) getRoutingInfo() *queryRoutingInfo {
+ return q.routingInfo
+}
+
+func (q *internalQuery) getKeyspaceFunc() func() string {
+ return q.qryOpts.getKeyspace
+}
+
+type batchOptions struct {
+ trace Tracer
+ observer BatchObserver
+
+ bType BatchType
+ entries []BatchEntry
+
+ defaultTimestamp bool
+ defaultTimestampValue int64
+
+ serialCons SerialConsistency
+
+ customPayload map[string][]byte
+ rt RetryPolicy
+ spec SpeculativeExecutionPolicy
+ context context.Context
+ keyspace string
+ idempotent bool
+ routingKey []byte
+ nowInSeconds *int
+}
+
+func newBatchOptions(b *Batch, ctx context.Context) *batchOptions {
+ // make a new array so if user keeps appending entries on the Batch
object it doesn't affect this execution
+ newEntries := make([]BatchEntry, len(b.Entries))
+ for i, e := range b.Entries {
+ newEntries[i] = e
+ }
+ var newRoutingKey []byte
+ if b.routingKey != nil {
+ routingKey := b.routingKey
+ newRoutingKey = make([]byte, len(routingKey))
+ copy(newRoutingKey, routingKey)
+ }
+ if ctx == nil {
+ ctx = b.Context()
+ }
+ return &batchOptions{
+ bType: b.Type,
+ entries: newEntries,
+ customPayload: b.CustomPayload,
+ rt: b.rt,
+ spec: b.spec,
+ trace: b.trace,
+ observer: b.observer,
+ serialCons: b.serialCons,
+ defaultTimestamp: b.defaultTimestamp,
+ defaultTimestampValue: b.defaultTimestampValue,
+ context: ctx,
+ keyspace: b.Keyspace(),
+ idempotent: b.IsIdempotent(),
+ routingKey: newRoutingKey,
+ nowInSeconds: b.nowInSeconds,
+ }
+}
+
+type internalBatch struct {
+ originalBatch *Batch
+ batchOpts *batchOptions
+ metrics *queryMetrics
+ consistency uint32
+ routingInfo *queryRoutingInfo
+ session *Session
+}
+
+func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
+ return &internalBatch{
+ originalBatch: batch,
+ batchOpts: newBatchOptions(batch, ctx),
+ metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
+ routingInfo: &queryRoutingInfo{},
+ session: batch.session,
+ }
+}
Review Comment:
nice catch, done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]