This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new ad763ce8e57 [Feature](WIP) Add fetch meta and fix rewrite tvf problem
(#55986)
ad763ce8e57 is described below
commit ad763ce8e57a1f790d78117ca0756f79a4901418
Author: wudi <[email protected]>
AuthorDate: Fri Sep 12 22:15:05 2025 +0800
[Feature](WIP) Add fetch meta and fix rewrite tvf problem (#55986)
### What problem does this PR solve?
Add fetch meta and fix rewrite tvf problem
---
.../insert/streaming/StreamingInsertJob.java | 64 +++++++++++----
.../insert/streaming/StreamingInsertTask.java | 32 ++++----
.../doris/job/offset/SourceOffsetProvider.java | 25 ++++--
.../job/offset/s3/S3SourceOffsetProvider.java | 91 ++++++++++++++--------
.../commands/insert/InsertIntoTableCommand.java | 47 ++---------
5 files changed, 146 insertions(+), 113 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 84e42c6e1f6..a4e05b650e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -56,6 +56,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import java.io.DataOutput;
@@ -65,6 +66,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+@Log4j2
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> implements
TxnStateChangeCallback, GsonPostProcessable {
private final long dbId;
@@ -83,14 +85,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("jp")
private StreamingJobProperties jobProperties;
@Getter
- @SerializedName("tt")
+ @SerializedName("tvf")
private String tvfType;
+ private Map<String, String> originTvfProps;
@Getter
StreamingInsertTask runningStreamTask;
SourceOffsetProvider offsetProvider;
@Setter
@Getter
private long lastScheduleTaskTimestamp = -1L;
+ private InsertIntoTableCommand baseCommand;
public StreamingInsertJob(String jobName,
JobStatus jobStatus,
@@ -105,17 +109,28 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.jobProperties = jobProperties;
- this.tvfType = parseTvfType();
- this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
- this.offsetProvider.init(getExecuteSql(), jobProperties);
+ init();
}
- private String parseTvfType() {
- NereidsParser parser = new NereidsParser();
- InsertIntoTableCommand command = (InsertIntoTableCommand)
parser.parseSingle(getExecuteSql());
- UnboundTVFRelation firstTVF = command.getFirstTVF();
- Preconditions.checkNotNull(firstTVF, "Only support insert sql with
tvf");
- return firstTVF.getFunctionName();
+ private void init() {
+ try {
+ UnboundTVFRelation currentTvf = getCurrentTvf();
+ this.originTvfProps = currentTvf.getProperties().getMap();
+ this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+ } catch (Exception ex) {
+ log.warn("init streaming insert job failed, sql: {}",
getExecuteSql(), ex);
+ throw new RuntimeException("init streaming insert job failed, sql:
" + getExecuteSql(), ex);
+ }
+ }
+
+ private UnboundTVFRelation getCurrentTvf() throws Exception {
+ if (baseCommand == null) {
+ this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(getExecuteSql());
+ }
+ List<UnboundTVFRelation> allTVFRelation =
baseCommand.getAllTVFRelation();
+ Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support
one source in insert streaming job");
+ UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+ return unboundTVFRelation;
}
@Override
@@ -155,7 +170,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
protected void fetchMeta() {
- offsetProvider.fetchRemoteMeta();
+ try {
+ if (originTvfProps == null) {
+ this.originTvfProps = getCurrentTvf().getProperties().getMap();
+ }
+ offsetProvider.fetchRemoteMeta(originTvfProps);
+ } catch (Exception ex) {
+ log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+ }
}
public boolean needScheduleTask() {
@@ -228,8 +250,19 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+
+ if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getSyncOffset()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+ if (offsetProvider != null && offsetProvider.getRemoteOffset() !=
null) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getRemoteOffset()));
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string :
jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
@@ -324,9 +357,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void gsonPostProcess() throws IOException {
- if (offsetProvider == null && jobProperties != null && tvfType !=
null) {
- this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
- // this.offsetProvider.init(getExecuteSql(), jobProperties);
+ if (offsetProvider == null) {
+ offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 1227624fc8f..73ac267e678 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -57,15 +57,15 @@ public class StreamingInsertTask {
private Long startTimeMs;
private Long finishTimeMs;
private String sql;
- private SourceOffsetProvider offsetProvider;
private StmtExecutor stmtExecutor;
- private InsertIntoTableCommand command;
+ private InsertIntoTableCommand taskCommand;
private String currentDb;
private UserIdentity userIdentity;
private ConnectContext ctx;
private Offset runningOffset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;
+ SourceOffsetProvider offsetProvider;
public StreamingInsertTask(long jobId,
long taskId,
@@ -73,13 +73,13 @@ public class StreamingInsertTask {
SourceOffsetProvider offsetProvider,
String currentDb,
StreamingJobProperties jobProperties,
- UserIdentity userIdentity) {
+ UserIdentity userIdentity) {
this.jobId = jobId;
this.taskId = taskId;
this.sql = sql;
- this.offsetProvider = offsetProvider;
this.userIdentity = userIdentity;
this.currentDb = currentDb;
+ this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
this.createTimeMs = System.currentTimeMillis();
@@ -106,7 +106,7 @@ public class StreamingInsertTask {
}
}
- private void before() throws JobException {
+ private void before() throws Exception {
this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();
@@ -117,12 +117,12 @@ public class StreamingInsertTask {
ctx.setSessionVariable(jobProperties.getSessionVariable());
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
- offsetProvider.init(sql, jobProperties);
- this.runningOffset = offsetProvider.getNextOffset();
- this.command = offsetProvider.rewriteTvfParams(runningOffset);
- this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER +
getTaskId()));
- this.command.setJobId(getTaskId());
- stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command,
ctx.getStatementContext()));
+
+ this.runningOffset = offsetProvider.getNextOffset(jobProperties,
jobProperties.getProperties());
+ this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
+ this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER
+ getTaskId()));
+ this.taskCommand.setJobId(getTaskId());
+ this.stmtExecutor = new StmtExecutor(ctx, new
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
}
private void run() throws JobException {
@@ -134,7 +134,7 @@ public class StreamingInsertTask {
log.info("task has been canceled, task id is {}",
getTaskId());
return;
}
- command.run(ctx, stmtExecutor);
+ taskCommand.run(ctx, stmtExecutor);
if (ctx.getState().getStateType() ==
QueryState.MysqlStateType.OK) {
return;
} else {
@@ -142,13 +142,13 @@ public class StreamingInsertTask {
}
log.error(
"streaming insert failed with {}, reason {}, to retry",
- command.getLabelName(),
+ taskCommand.getLabelName(),
errMsg);
if (retry == MAX_RETRY) {
errMsg = "reached max retry times, failed with" + errMsg;
}
} catch (Exception e) {
- log.warn("execute insert task error, label is {},offset is
{}", command.getLabelName(),
+ log.warn("execute insert task error, label is {},offset is
{}", taskCommand.getLabelName(),
runningOffset.toJson(), e);
errMsg = Util.getRootCauseMessage(e);
}
@@ -209,8 +209,8 @@ public class StreamingInsertTask {
if (null != stmtExecutor) {
stmtExecutor = null;
}
- if (null != command) {
- command = null;
+ if (null != taskCommand) {
+ taskCommand = null;
}
if (null != ctx) {
ctx = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 49a272c664f..d9b2264d5b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -20,16 +20,13 @@ package org.apache.doris.job.offset;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import java.util.Map;
+
/**
* Interface for managing offsets and metadata of a data source.
*/
public interface SourceOffsetProvider {
- /**
- * init
- */
- void init(String executeSql, StreamingJobProperties jobProperties);
-
/**
* Get source type, e.g. s3, kafka
* @return
@@ -40,7 +37,7 @@ public interface SourceOffsetProvider {
* Get next offset to consume
* @return
*/
- Offset getNextOffset();
+ Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String>
properties);
/**
* Get current offset
@@ -48,12 +45,24 @@ public interface SourceOffsetProvider {
*/
Offset getCurrentOffset();
+ /**
+ * Get sync offset to show
+ * @return
+ */
+ String getSyncOffset();
+
+ /**
+ * Get remote offset
+ * @return
+ */
+ String getRemoteOffset();
+
/**
* Rewrite the TVF parameters in the SQL based on the current offset.
* @param nextOffset
* @return rewritten InsertIntoTableCommand
*/
- InsertIntoTableCommand rewriteTvfParams(Offset nextOffset);
+ InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset
nextOffset) throws Exception;
/**
* Update the offset of the source.
@@ -64,7 +73,7 @@ public interface SourceOffsetProvider {
/**
* Fetch remote meta information, such as listing files in S3 or getting
latest offsets in Kafka.
*/
- void fetchRemoteMeta();
+ void fetchRemoteMeta(Map<String, String> properties) throws Exception;
/**
* Whether there is more data to consume
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index b858a1e6431..a95f4af65c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,51 +17,35 @@
package org.apache.doris.job.offset.s3;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.functions.table.S3;
+import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
+import org.apache.doris.qe.ConnectContext;
+import com.google.common.collect.ImmutableList;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
@Log4j2
public class S3SourceOffsetProvider implements SourceOffsetProvider {
- String executeSql;
S3Offset currentOffset;
String maxRemoteEndFile;
- StreamingJobProperties jobProperties;
- NereidsParser parser;
- String filePath;
- StorageProperties storageProperties;
-
- @Override
- public void init(String executeSql, StreamingJobProperties jobProperties) {
- //todo: check is already init
- this.executeSql = executeSql;
- this.jobProperties = jobProperties;
- this.parser = new NereidsParser();
- InsertIntoTableCommand command = (InsertIntoTableCommand)
parser.parseSingle(executeSql);
- UnboundTVFRelation firstTVF = command.getFirstTVF();
- Map<String, String> properties = firstTVF.getProperties().getMap();
- try {
- this.storageProperties =
StorageProperties.createPrimary(properties);
- String uri = storageProperties.validateAndGetUri(properties);
- this.filePath = storageProperties.validateAndNormalizeUri(uri);
- } catch (UserException e) {
- throw new RuntimeException("Failed check storage props, " +
e.getMessage(), e);
- }
- }
+ InsertIntoTableCommand baseCommand;
@Override
public String getSourceType() {
@@ -69,13 +53,17 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public S3Offset getNextOffset() {
+ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String,
String> properties) {
S3Offset offset = new S3Offset();
List<String> rfiles = new ArrayList<>();
String startFile = currentOffset == null ? null :
currentOffset.endFile;
+ String filePath = null;
+ StorageProperties storageProperties =
StorageProperties.createPrimary(properties);
try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
+ String uri = storageProperties.validateAndGetUri(properties);
+ filePath = storageProperties.validateAndNormalizeUri(uri);
maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles,
startFile,
- jobProperties.getS3BatchFiles(),
jobProperties.getS3BatchSize());
+ jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
offset.setStartFile(startFile);
offset.setEndFile(rfiles.get(rfiles.size() - 1));
offset.setFileLists(rfiles);
@@ -92,15 +80,41 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public InsertIntoTableCommand rewriteTvfParams(Offset runningOffset) {
+ public String getSyncOffset() {
+ if (currentOffset != null) {
+ return currentOffset.getEndFile();
+ }
+ return null;
+ }
+
+ @Override
+ public String getRemoteOffset() {
+ return maxRemoteEndFile;
+ }
+
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset
runningOffset) throws Exception {
S3Offset offset = (S3Offset) runningOffset;
Map<String, String> props = new HashMap<>();
String finalUri = "{" + String.join(",", offset.getFileLists()) + "}";
props.put("uri", finalUri);
- InsertIntoTableCommand command = (InsertIntoTableCommand)
parser.parseSingle(executeSql);
- //todo: command query plan is immutable
- //command.rewriteFirstTvfProperties(getSourceType(), props);
- return command;
+ if (baseCommand == null) {
+ this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(executeSql);
+ this.baseCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
+ }
+
+ // rewrite plan
+ Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
+ if (plan instanceof LogicalTVFRelation) {
+ LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
+ LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
+ originTvfRel.getRelationId(), new S3(new
Properties(props)), ImmutableList.of());
+ return newRvfRel;
+ }
+ return plan;
+ });
+ return new InsertIntoTableCommand((LogicalPlan) rewritePlan,
Optional.empty(), Optional.empty(),
+ Optional.empty(), true, Optional.empty());
}
@Override
@@ -109,8 +123,17 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public void fetchRemoteMeta() {
- // list object
+ public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
+ StorageProperties storageProperties =
StorageProperties.createPrimary(properties);
+ String startFile = currentOffset == null ? null :
currentOffset.endFile;
+ try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
+ String uri = storageProperties.validateAndGetUri(properties);
+ String filePath = storageProperties.validateAndNormalizeUri(uri);
+ maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new
ArrayList<>(), startFile,
+ 1, 1);
+ } catch (Exception e) {
+ throw e;
+ }
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 04a1db682c5..6736b16f8af 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -79,11 +79,10 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -540,51 +539,21 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
}
// todo: add ut
- public UnboundTVFRelation getFirstTVF() {
- return getFirstTvfInPlan(getLogicalQuery());
+ public List<UnboundTVFRelation> getAllTVFRelation() {
+ List<UnboundTVFRelation> tvfs = new ArrayList<>();
+ findAllTVFInPlan(getLogicalQuery(), tvfs);
+ return tvfs;
}
- private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) {
+ private void findAllTVFInPlan(LogicalPlan plan, List<UnboundTVFRelation>
tvfs) {
if (plan instanceof UnboundTVFRelation) {
UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
- return tvfRelation;
+ tvfs.add(tvfRelation);
}
for (Plan child : plan.children()) {
if (child instanceof LogicalPlan) {
- UnboundTVFRelation result = getFirstTvfInPlan((LogicalPlan)
child);
- if (result != null) {
- return result;
- }
- }
- }
- return null;
- }
-
- // todo: add ut
- public void rewriteFirstTvfProperties(String functionName, Map<String,
String> props) {
- AtomicBoolean found = new AtomicBoolean(false);
- rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found);
- }
-
- private void rewriteFirstTvfInPlan(LogicalPlan plan,
- String functionName, Map<String, String> props, AtomicBoolean
found) {
- if (found.get()) {
- return;
- }
-
- if (plan instanceof UnboundTVFRelation) {
- UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
- if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) {
- tvfRelation.getProperties().getMap().putAll(props);
- found.set(true);
- return;
- }
- }
-
- for (Plan child : plan.children()) {
- if (child instanceof LogicalPlan) {
- rewriteFirstTvfInPlan((LogicalPlan) child, functionName,
props, found);
+ findAllTVFInPlan((LogicalPlan) child, tvfs);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]