This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 9d3ae58bf1f301a7f31d04256ddbe04ec94899b5 Author: 维章 <[email protected]> AuthorDate: Mon May 23 11:58:39 2022 +0800 merge 1.0 --- pom.xml | 1 + rocketmq-streams-connectors/pom.xml | 47 ++++ .../streams/connectors/IBoundedSource.java | 32 +++ .../streams/connectors/IBoundedSourceReader.java | 26 ++ .../streams/connectors/IScheduleCallback.java | 24 ++ .../connectors/balance/AbstractBalance.java | 207 ++++++++++++++ .../streams/connectors/balance/IBalanceTask.java | 24 ++ .../streams/connectors/balance/ISourceBalance.java | 60 ++++ .../streams/connectors/balance/SplitChanged.java | 55 ++++ .../connectors/balance/impl/LeaseBalanceImpl.java | 144 ++++++++++ .../streams/connectors/model/PullMessage.java | 50 ++++ .../streams/connectors/model/ReaderStatus.java | 120 ++++++++ .../streams/connectors/reader/DBScanReader.java | 269 ++++++++++++++++++ .../streams/connectors/reader/ISplitReader.java | 96 +++++++ .../connectors/reader/SplitCloseFuture.java | 83 ++++++ .../connectors/source/AbstractPullSource.java | 312 +++++++++++++++++++++ .../source/CycleDynamicMultipleDBScanSource.java | 213 ++++++++++++++ .../source/DynamicMultipleDBScanSource.java | 190 +++++++++++++ .../streams/connectors/source/IPullSource.java | 60 ++++ .../connectors/source/MutilBatchTaskSource.java | 158 +++++++++++ .../streams/connectors/source/SourceInstance.java | 37 +++ .../source/filter/AbstractPatternFilter.java | 38 +++ .../source/filter/BoundedPatternFilter.java | 53 ++++ .../source/filter/CyclePatternFilter.java | 173 ++++++++++++ .../connectors/source/filter/CyclePeriod.java | 222 +++++++++++++++ .../connectors/source/filter/CycleSchedule.java | 236 ++++++++++++++++ .../source/filter/CycleScheduleFilter.java | 37 +++ .../source/filter/DataFormatPatternFilter.java | 106 +++++++ .../connectors/source/filter/PatternFilter.java | 41 +++ .../window/offset/WindowMaxValueProcessor.java | 2 +- 30 files changed, 3115 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9da26dcd..7caa0cbc 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ <module>rocketmq-streams-channel-http</module> <module>rocketmq-streams-state</module> <module>rocketmq-streams-examples</module> + <module>rocketmq-streams-connectors</module> <module>rocketmq-streams-channel-syslog</module> <module>rocketmq-streams-channel-es</module> <module>rocketmq-streams-channel-kafka</module> diff --git a/rocketmq-streams-connectors/pom.xml b/rocketmq-streams-connectors/pom.xml new file mode 100644 index 00000000..d544e3d5 --- /dev/null +++ b/rocketmq-streams-connectors/pom.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams</artifactId> + <version>1.0.2-SNAPSHOT</version> + </parent> + <artifactId>rocketmq-streams-connectors</artifactId> + <packaging>jar</packaging> + <name>ROCKETMQ STREAMS :: connectors</name> + + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-lease</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-schedule</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-commons</artifactId> + </dependency> + </dependencies> +</project> diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java new file mode 100644 index 00000000..1a383433 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors; + +import org.apache.rocketmq.streams.common.channel.split.ISplit; + +/** + * @description + */ +public interface IBoundedSource{ + + /** + * reader完成时调用 + * @param iSplit + */ + void boundedFinishedCallBack(ISplit iSplit); + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java new file mode 100644 index 00000000..03995c31 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors; + +import org.apache.rocketmq.streams.common.interfaces.ILifeCycle; + +/** + * @description + */ +public interface IBoundedSourceReader extends ILifeCycle { + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java new file mode 100644 index 00000000..88fb2cfd --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors; + +import java.util.Date; + +public interface IScheduleCallback { + + boolean canFire(Date date); +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java new file mode 100644 index 00000000..5c2b266f --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.balance; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.connectors.source.SourceInstance; + +public abstract class AbstractBalance implements ISourceBalance { + + protected int balanceCount = 0; + + @Override + public SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits) { + balanceCount++; + heartBeat(); + List<SourceInstance> sourceInstances = fetchSourceInstances(); + List<ISplit> workingSplits = fetchWorkingSplits(allSplits); + SplitChanged splitChanged = getAdditionSplits(allSplits, sourceInstances, workingSplits, ownerSplits); + if (splitChanged != null) { + return splitChanged; + } + splitChanged = getRemoveSplits(allSplits, sourceInstances, workingSplits, ownerSplits); + return splitChanged; + } + + protected void heartBeat() { + holdLockSourceInstance(); + } + + /** + * get all dispatch splits + * + * @return + */ + protected abstract List<ISplit> fetchWorkingSplits(List<ISplit> allSplitS); + + /** + * get all instacne for the source + * + * @return + */ + protected abstract List<SourceInstance> fetchSourceInstances(); + + /** + * lock the source ,the lock is globel,only one source instance can get it in same time + * + * @return + */ + protected abstract boolean holdLockSourceInstance(); + + /** + * unlock + */ + protected abstract void unlockSourceInstance(); + + /** + * juge need add split,根据调度策略选择 + * 每次最大增加的分片数,根据调度次数决定 + * + * @param allSplits + * @param sourceInstances + * @param workingSplits + * @return + */ + protected SplitChanged getAdditionSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances, + List<ISplit> workingSplits, List<ISplit> ownerSplits) { + SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size()); + if (splitChanged == null) { + return null; + } + if (splitChanged.isNewSplit == false) { + return null; + } + if (splitChanged.splitCount <= 0) { + return null; + } + List<ISplit> noWorkingSplits = getNoWorkingSplits(allSplits, workingSplits); + List<ISplit> newSplits = new ArrayList<>(); + for (int i = 0; i < noWorkingSplits.size(); i++) { + boolean success = holdLockSplit(noWorkingSplits.get(i)); + if (success) { + newSplits.add(noWorkingSplits.get(i)); + if (newSplits.size() >= splitChanged.splitCount) { + break; + } + } + } + splitChanged.setChangedSplits(newSplits); + return splitChanged; + + } + + protected List<ISplit> getNoWorkingSplits(List<ISplit> allSplits, List<ISplit> workingSplits) { + Set<String> workingSplitIds = new HashSet<>(); + for (ISplit split : workingSplits) { + workingSplitIds.add(split.getQueueId()); + } + List<ISplit> splits = new ArrayList<>(); + for (ISplit split : allSplits) { + if (!workingSplitIds.contains(split.getQueueId())) { + splits.add(split); + } + } + return splits; + } + + /** + * 获取需要删除的分片 + * + * @param allSplits + * @param sourceInstances + * @param workingSplits + * @return + */ + protected SplitChanged getRemoveSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances, + List<ISplit> workingSplits, List<ISplit> ownerSplits) { + SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size()); + if (splitChanged == null) { + return null; + } + if (splitChanged.isNewSplit == true) { + return null; + } + + if (splitChanged.splitCount <= 0) { + return null; + } + //List<ISplit> ownerSplits=source.ownerSplits(); + List<ISplit> removeSplits = new ArrayList(); + for (int i = 0; i < splitChanged.splitCount; i++) { + removeSplits.add(ownerSplits.get(i)); + } + splitChanged.setChangedSplits(removeSplits); + return splitChanged; + } + + /** + * 获取需要变动的分片个数,新增或删除 + * 分配策略,只有有未分配的分片时才会分配新分片,为了减少分片切换,前面几次尽可能少分,后面越来越多 + * + * @return 需要本实例有变化的分配,新增或删除 + */ + protected SplitChanged getChangedSplitCount(List<ISplit> allSplits, List<SourceInstance> sourceInstances, + int splitCountInWorking, int ownerSplitCount) { + //int ownerSplitCount=source.ownerSplits().size(); + int instanceCount = sourceInstances.size(); + if (instanceCount == 0) { + instanceCount = 1; + } + int allSplitCount = allSplits.size(); + int minSplitCount = allSplitCount / instanceCount; + int maxSplitCount = minSplitCount + (allSplitCount % instanceCount == 0 ? 0 : 1); + //已经是最大分片数了 + if (ownerSplitCount == maxSplitCount) { + return null; + } + if (ownerSplitCount > maxSplitCount) { + int changeSplitCount = ownerSplitCount - maxSplitCount; + return new SplitChanged(changeSplitCount, false); + } + //分片已经全部在处理,当前分片也符合最小分片分配策略,不需要重新分配 + if (splitCountInWorking == allSplitCount && ownerSplitCount >= minSplitCount) { + return null; + } + //如果还有未分配的分片,且当前实例还有分片的可行性,则分配分片 + if (splitCountInWorking < allSplitCount && ownerSplitCount < maxSplitCount) { + int changeSplitCount = Math.min(maxSplitCount - ownerSplitCount, getMaxSplitCountInOneBalance()); + + return new SplitChanged(changeSplitCount, true); + } + return null; + } + + @Override + public int getBalanceCount() { + return balanceCount; + } + + /** + * 每次负载均衡最大的分片个数,目的是前几次,少分配分配,可能有实例在启动中,以免频繁切换分片,到后面实例都启动了,斤可能多分配分片 + * + * @return + */ + private int getMaxSplitCountInOneBalance() { + int balanceCount = getBalanceCount(); + return (int) Math.pow(2, balanceCount - 1); + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java new file mode 100644 index 00000000..5275fe48 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.balance; + +/** + * @description + */ +public interface IBalanceTask extends Runnable { + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java new file mode 100644 index 00000000..b012b323 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.balance; + +import java.util.List; +import org.apache.rocketmq.streams.common.channel.split.ISplit; + +public interface ISourceBalance { + + /** + * 做负载均衡 + + * @return + */ + SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits); + + /** + * 从启动开始,做了多少次均衡 + * @return + */ + int getBalanceCount(); + + + + boolean getRemoveSplitLock(); + + void unLockRemoveSplitLock(); + + /** + * lock the split and hold it util the instance is shutdown or remove split + * @param split + * @return + */ + boolean holdLockSplit(ISplit split); + + /** + * unlock split lock + * @param split + */ + void unlockSplit(ISplit split); + + + void setSourceIdentification(String sourceIdentification); + + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java new file mode 100644 index 00000000..c01c1519 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.balance; + +import java.util.List; +import org.apache.rocketmq.streams.common.channel.split.ISplit; + +public class SplitChanged { + + protected int splitCount;//变动多分片个数 + protected boolean isNewSplit;//是否新增,false是删除 + protected List<ISplit> changedSplits; + public SplitChanged(int splitCount,boolean isNewSplit){ + this.splitCount=splitCount; + this.isNewSplit=isNewSplit; + } + + public int getSplitCount() { + return splitCount; + } + + public void setSplitCount(int splitCount) { + this.splitCount = splitCount; + } + + public boolean isNewSplit() { + return isNewSplit; + } + + public void setNewSplit(boolean newSplit) { + isNewSplit = newSplit; + } + + public List<ISplit> getChangedSplits() { + return changedSplits; + } + + public void setChangedSplits(List<ISplit> changedSplits) { + this.changedSplits = changedSplits; + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java new file mode 100644 index 00000000..dc504e5d --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.balance.impl; + +import com.google.auto.service.AutoService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.model.ServiceName; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.RuntimeUtil; +import org.apache.rocketmq.streams.connectors.balance.AbstractBalance; +import org.apache.rocketmq.streams.connectors.balance.ISourceBalance; +import org.apache.rocketmq.streams.connectors.source.SourceInstance; +import org.apache.rocketmq.streams.lease.LeaseComponent; +import org.apache.rocketmq.streams.lease.model.LeaseInfo; +import org.apache.rocketmq.streams.lease.service.ILeaseService; + +@AutoService(ISourceBalance.class) +@ServiceName(LeaseBalanceImpl.DB_BALANCE_NAME) +public class LeaseBalanceImpl extends AbstractBalance { + + private static final Log logger = LogFactory.getLog(LeaseBalanceImpl.class); + + public static final String DB_BALANCE_NAME = "db_balance"; + private static final String REMOVE_SPLIT_LOCK_NAME = "lock_remove_split"; + private static final String SOURCE_LOCK_PREFIX = "SOURCE_"; + private static final String SPLIT_LOCK_PREFIX = "SPLIT_"; + protected transient LeaseComponent leaseComponent = LeaseComponent.getInstance(); + protected transient String sourceIdentification; + + protected int lockTimeSecond = 5; + + public LeaseBalanceImpl(String sourceIdentification) { + + this.sourceIdentification = sourceIdentification; + } + + public LeaseBalanceImpl() { + + } + + @Override + protected List<ISplit> fetchWorkingSplits(List<ISplit> allSplits) { + List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SPLIT_LOCK_PREFIX + this.sourceIdentification, null); + logger.info(String.format("lease SPLIT_LOCK_PREFIX is %s, sourceIdentification is %s. ", SPLIT_LOCK_PREFIX, sourceIdentification)); + if (leaseInfos == null) { + return new ArrayList<>(); + } + + Map<String, ISplit> allSplitMap = new HashMap<>(); + for (ISplit split : allSplits) { + allSplitMap.put(split.getQueueId(), split); + } + List<ISplit> splits = new ArrayList<>(); + for (LeaseInfo leaseInfo : leaseInfos) { + String leaseName = leaseInfo.getLeaseName(); + String splitId = MapKeyUtil.getLast(leaseName); + splits.add(allSplitMap.get(splitId)); + } + logger.info(String.format("working split is %s", Arrays.toString(splits.toArray()))); + return splits; + } + + @Override + protected List<SourceInstance> fetchSourceInstances() { + List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SOURCE_LOCK_PREFIX + sourceIdentification, null); + if (leaseInfos == null) { + return new ArrayList<>(); + } + List<SourceInstance> sourceInstances = new ArrayList<>(); + for (LeaseInfo leaseInfo : leaseInfos) { + String leaseName = leaseInfo.getLeaseName(); + sourceInstances.add(new SourceInstance(leaseName)); + } + return sourceInstances; + } + + @Override + protected boolean holdLockSourceInstance() { + return holdLock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId()); + } + + @Override + protected void unlockSourceInstance() { + leaseComponent.getService().unlock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId()); + } + + @Override + public boolean holdLockSplit(ISplit split) { + return holdLock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId()); + } + + @Override + public void unlockSplit(ISplit split) { + leaseComponent.getService().unlock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId()); + + } + + @Override + public boolean getRemoveSplitLock() { + return holdLock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME); + } + + @Override + public void unLockRemoveSplitLock() { + leaseComponent.getService().unlock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME); + } + + public String getSourceIdentification() { + return sourceIdentification; + } + + @Override + public void setSourceIdentification(String sourceIdentification) { + this.sourceIdentification = sourceIdentification; + } + + protected boolean holdLock(String name, String lockName) { + ILeaseService leaseService = leaseComponent.getService(); + boolean success = leaseService.holdLock(name, lockName, lockTimeSecond); + return success; + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java new file mode 100644 index 00000000..9bf34803 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.model; + +import org.apache.rocketmq.streams.common.context.MessageOffset; + +public class PullMessage<T> { + protected T message; + protected MessageOffset messageOffset; + + public T getMessage() { + return message; + } + + public void setMessage(T message) { + this.message = message; + } + + public MessageOffset getMessageOffset() { + return messageOffset; + } + + public void setMessageOffset(MessageOffset messageOffset) { + this.messageOffset = messageOffset; + } + /** + * 获取offset字符串,通过.把主offset和子offset串接在一起 + * @return + */ + public String getOffsetStr(){ + return this.messageOffset.getOffsetStr(); + } + public String getMainOffset() { + return messageOffset.getMainOffset(); + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java new file mode 100644 index 00000000..a4889b5f --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.model; + +import java.util.Date; +import java.util.List; +import org.apache.rocketmq.streams.common.model.Entity; +import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; + +/** + * @description + */ +public class ReaderStatus extends Entity { + + /** + * 查询单个readerStatus + */ + static final String queryReaderStatusByUK = "select * from reader_status where source_name = '%s' and reader_name = '%s' and is_finished = 1"; + + static final String queryReaderStatusList = "select * from reader_status where source_name = '%s' and is_finished = 1"; + + static final String clearReaderStatus = "update reader_status set gmt_modified = now(), is_finished = -1 where source_name = '%s' and reader_name = '%s'"; + + String sourceName; + + String readerName; + + int isFinished; + + int totalReader; + + public String getReaderName() { + return readerName; + } + + public void setReaderName(String readerName) { + this.readerName = readerName; + } + + public int getIsFinished() { + return isFinished; + } + + public void setIsFinished(int isFinished) { + this.isFinished = isFinished; + } + + public int getTotalReader() { + return totalReader; + } + + public void setTotalReader(int totalReader) { + this.totalReader = totalReader; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + @Override + public String toString() { + return "ReaderStatus{" + + "id=" + id + + ", gmtCreate=" + gmtCreate + + ", gmtModified=" + gmtModified + + ", sourceName='" + sourceName + '\'' + + ", readerName='" + readerName + '\'' + + ", isFinished=" + isFinished + + ", totalReader=" + totalReader + + '}'; + } + + public static ReaderStatus queryReaderStatusByUK(String sourceName, String readerName) { + String sql = String.format(queryReaderStatusByUK, sourceName, readerName); + ReaderStatus readerStatus = ORMUtil.queryForObject(sql, null, ReaderStatus.class); + return readerStatus; + } + + public static List<ReaderStatus> queryReaderStatusListBySourceName(String sourceName) { + String sql = String.format(queryReaderStatusList, sourceName); + List<ReaderStatus> readerStatusList = ORMUtil.queryForList(sql, null, ReaderStatus.class); + return readerStatusList; + } + + public static void clearReaderStatus(String sourceName, String readerName) { + String sql = String.format(clearReaderStatus, sourceName, readerName); + ORMUtil.executeSQL(sql, null); + } + + public static ReaderStatus create(String sourceName, String readerName, int isFinished, int totalReader) { + + ReaderStatus readerStatus = new ReaderStatus(); + readerStatus.setSourceName(sourceName); + readerStatus.setReaderName(readerName); + readerStatus.setIsFinished(isFinished); + readerStatus.setTotalReader(totalReader); + readerStatus.setGmtCreate(new Date()); + readerStatus.setGmtModified(new Date()); + return readerStatus; + + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java new file mode 100644 index 00000000..268e891e --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.reader; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.channel.source.ISource; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.component.AbstractComponent; +import org.apache.rocketmq.streams.common.context.MessageOffset; +import org.apache.rocketmq.streams.common.utils.ThreadUtil; +import org.apache.rocketmq.streams.connectors.IBoundedSource; +import org.apache.rocketmq.streams.connectors.IBoundedSourceReader; +import org.apache.rocketmq.streams.connectors.model.PullMessage; +import org.apache.rocketmq.streams.connectors.model.ReaderStatus; +import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; +import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; + +/** + * @description + */ +public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable { + + private static final long serialVersionUID = 8172403250050893288L; + private static final Log logger = LogFactory.getLog(DBScanReader.class); + static final String sqlTemplate = "select * from %s where id >= %d and id < %d"; + + //是否完成了source的call back调用 + transient volatile boolean isFinishedCall = false; + ISource iSource; + String url; + String userName; + String password; + String tableName; + int batchSize; + long offset; + long offsetStart; + long offsetEnd; + long maxOffset; + long minOffset; + ISplit iSplit; + transient List<PullMessage> pullMessages; + volatile boolean interrupt = false; + volatile boolean isClosed = false; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public ISplit getISplit() { + return iSplit; + } + + public void setISplit(ISplit iSplit) { + this.iSplit = iSplit; + } + + public DBScanReader() { + + } + + transient ThreadLocal<JDBCDriver> threadLocal = new ThreadLocal<JDBCDriver>() { + + @Override + public JDBCDriver initialValue() { + logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName())); + return DriverBuilder.createDriver(AbstractComponent.DEFAULT_JDBC_DRIVER, url, userName, password); + } + + }; + + @Override + public void open(ISplit split) { + this.iSplit = split; + JDBCDriver jdbcDriver = threadLocal.get(); + Map<String, Object> range = jdbcDriver.queryOneRow("select min(id) as min_id, max(id) as max_id from " + tableName); + minOffset = Long.parseLong(String.valueOf(range.get("min_id"))); + maxOffset = Long.parseLong(String.valueOf(range.get("max_id"))); + offsetStart = minOffset; + offset = minOffset; + logger.info(String.format("table %s min id [ %d ], max id [ %d ]", tableName, minOffset, maxOffset)); + pullMessages = new ArrayList<>(); + } + + @Override + public boolean next() { + if (interrupt) { + return false; + } + if (isFinished()) { + finish(); + ThreadUtil.sleep(10 * 1000); + return false; + } + JDBCDriver jdbcDriver = threadLocal.get(); + offsetEnd = offsetStart + batchSize; + String batchQuery = String.format(sqlTemplate, tableName, offsetStart, offsetEnd); + logger.debug(String.format("execute sql : %s", batchQuery)); + List<Map<String, Object>> resultData = jdbcDriver.queryForList(batchQuery); + offsetStart = offsetEnd; + pullMessages.clear(); + for (Map<String, Object> r : resultData) { + PullMessage msg = new PullMessage(); + JSONObject data = JSONObject.parseObject(JSON.toJSONString(r)); + msg.setMessage(data); + offset = offset > Long.parseLong(data.getString("id")) ? offset : Long.parseLong(data.getString("id")); + msg.setMessageOffset(new MessageOffset(String.valueOf(offset), true)); + pullMessages.add(msg); + } + return offsetStart - batchSize <= maxOffset; + } + + @Override + public List<PullMessage> getMessage() { +// logger.info(String.format("output messages %d", pullMessages.size())); + return pullMessages; + } + + @Override + public SplitCloseFuture close() { +// interrupt = true; + isClosed = true; + threadLocal.remove(); + pullMessages = null; + return new SplitCloseFuture(this, iSplit); + } + + @Override + public void seek(String cursor) { + if (cursor == null || cursor.trim().equals("")) { + cursor = "0"; + } + offset = Long.parseLong(cursor); + if (offset < minOffset) { + offset = minOffset; + } + offsetStart = offset; + logger.info(String.format("split %s seek %d.", iSplit.getQueueId(), offset)); + } + + @Override + public String getProgress() { + return String.valueOf(offset); + } + + @Override + public long getDelay() { + return maxOffset - offset; + } + + @Override + public long getFetchedDelay() { + return 0; + } + + @Override + public boolean isClose() { + return isClosed; + } + + @Override + public ISplit getSplit() { + return iSplit; + } + + @Override + public boolean isInterrupt() { + return interrupt; + } + + @Override + public boolean interrupt() { + interrupt = true; + return true; + } + + @Override + public boolean isFinished() { + return offsetStart > maxOffset; + } + + @Override + public void finish() { + if (isFinishedCall) { + return; + } + pullMessages = null; + updateReaderStatus(); + IBoundedSource tmp = (IBoundedSource) iSource; + tmp.boundedFinishedCallBack(this.iSplit); + isFinishedCall = true; + } + + public ISource getISource() { + return iSource; + } + + public void setISource(ISource iSource) { + this.iSource = iSource; + } + + private final void updateReaderStatus() { + String sourceName = CycleDynamicMultipleDBScanSource.createKey(this.getISource()); + int finish = Integer.valueOf(1); + int total = ((CycleDynamicMultipleDBScanSource) iSource).getTotalReader(); + ReaderStatus readerStatus = ReaderStatus.create(sourceName, iSplit.getQueueId(), finish, total); + logger.info(String.format("create reader status %s.", readerStatus)); + ORMUtil.batchReplaceInto(readerStatus); + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java new file mode 100644 index 00000000..6b377cff --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.reader; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.connectors.model.PullMessage; + +public interface ISplitReader extends Serializable { + + /** + * Open. + * + * @param split the split + * @throws IOException the io exception + */ + void open(ISplit split); + + /** + * Next boolean. + * + * @return the boolean + * @throws IOException the io exception + * @throws InterruptedException the interrupted exception + */ + boolean next(); + + /** + * Gets message. + * + * @return the message + */ + List<PullMessage> getMessage(); + + /** + * Close. + * + * @throws IOException the io exception + */ + SplitCloseFuture close(); + + /** + * Seek. + * + * @param cursor the cursor + * @throws IOException the io exception + */ + void seek(String cursor); + + /** + * Gets progress. + * + * @return the progress + * @throws IOException the io exception + */ + String getProgress(); + + /** + * Get message delay (millseconds) + * + * @return delay + */ + long getDelay(); + + /** + * Get message delay (millseconds) from being fetched + * + * @return delay + */ + long getFetchedDelay(); + + boolean isClose(); + + ISplit getSplit(); + + boolean isInterrupt(); + + boolean interrupt(); + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java new file mode 100644 index 00000000..b28748b8 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.reader; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.rocketmq.streams.common.channel.split.ISplit; + +public class SplitCloseFuture implements Future<Boolean> { + + protected ISplitReader reader; + protected ISplit split; + + public SplitCloseFuture(ISplitReader reader, ISplit split) { + this.reader = reader; + this.split = split; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return reader.isClose(); + } + + @Override + public Boolean get() throws InterruptedException, ExecutionException { + synchronized (reader) { + reader.wait(); + } + return reader.isClose(); + } + + @Override + public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + synchronized (reader) { + long time = timeout; + if (unit == TimeUnit.SECONDS) { + time = time * 1000; + } else if (unit == TimeUnit.MINUTES) { + time = time * 1000 * 60; + } else if (unit == TimeUnit.HOURS) { + time = time * 1000 * 60 * 60; + } else { + throw new RuntimeException("can not support this timeout, expect less hour " + timeout + " the unit is " + unit); + } + reader.wait(time); + } + return reader.isClose(); + } + + public ISplitReader getReader() { + return reader; + } + + public ISplit getSplit() { + return split; + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java new file mode 100644 index 00000000..9aedc95a --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.channel.source.AbstractSource; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.checkpoint.CheckPoint; +import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager; +import org.apache.rocketmq.streams.common.context.Message; +import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.connectors.balance.ISourceBalance; +import org.apache.rocketmq.streams.connectors.balance.SplitChanged; +import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl; +import org.apache.rocketmq.streams.connectors.model.PullMessage; +import org.apache.rocketmq.streams.connectors.reader.ISplitReader; +import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture; +import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent; + +public abstract class AbstractPullSource extends AbstractSource implements IPullSource<AbstractSource> { + + private static final Log logger = LogFactory.getLog(AbstractPullSource.class); + + protected transient ISourceBalance balance;// balance interface + protected transient ScheduledExecutorService balanceExecutor;//schdeule balance + protected transient Map<String, ISplitReader> splitReaders = new HashMap<>();//owner split readers + protected transient Map<String, ISplit> ownerSplits = new HashMap<>();//working splits by the source instance + + //可以有多种实现,通过名字选择不同的实现 + protected String balanceName = LeaseBalanceImpl.DB_BALANCE_NAME; + //balance schedule time + protected int balanceTimeSecond = 10; + protected long pullIntervalMs; + protected transient CheckPointManager checkPointManager = new CheckPointManager(); + protected transient boolean shutDown=false; + @Override + protected boolean startSource() { + ServiceLoaderComponent serviceLoaderComponent = ServiceLoaderComponent.getInstance(ISourceBalance.class); + balance = (ISourceBalance) serviceLoaderComponent.getService().loadService(balanceName); + balance.setSourceIdentification(MapKeyUtil.createKey(getNameSpace(), getConfigureName())); + balanceExecutor = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("balance-task-%d").daemon(true).build()); + List<ISplit> allSplits = fetchAllSplits(); + SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values())); + doSplitChanged(splitChanged); + balanceExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + logger.info("balance running..... current splits is " + ownerSplits); + List<ISplit> allSplits = fetchAllSplits(); + SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values())); + doSplitChanged(splitChanged); + } + }, balanceTimeSecond, balanceTimeSecond, TimeUnit.SECONDS); + + startWorks(); + return true; + } + + private void startWorks() { + ExecutorService workThreads= ThreadPoolFactory.createThreadPool(maxThread); + long start=System.currentTimeMillis(); + while (!shutDown) { + Iterator<Map.Entry<String, ISplitReader>> it = splitReaders.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, ISplitReader> entry=it.next(); + String splitId=entry.getKey(); + ISplit split=ownerSplits.get(splitId); + ISplitReader reader=entry.getValue(); + ReaderRunner runner=new ReaderRunner(split,reader); + workThreads.execute(runner); + } + try { + long sleepTime=this.pullIntervalMs-(System.currentTimeMillis()-start); + if(sleepTime>0){ + Thread.sleep(sleepTime); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Override + public Map<String, ISplit> getAllSplitMap() { + List<ISplit> splits = fetchAllSplits(); + if (splits == null) { + return new HashMap<>(); + } + Map<String, ISplit> splitMap = new HashMap<>(); + for (ISplit split : splits) { + splitMap.put(split.getQueueId(), split); + } + return splitMap; + } + + protected void doSplitChanged(SplitChanged splitChanged) { + if (splitChanged == null) { + return; + } + if (splitChanged.getSplitCount() == 0) { + return; + } + if (splitChanged.isNewSplit()) { + doSplitAddition(splitChanged.getChangedSplits()); + } else { + doSplitRelease(splitChanged.getChangedSplits()); + } + } + + protected void doSplitAddition(List<ISplit> changedSplits) { + if (changedSplits == null) { + return; + } + Set<String> splitIds = new HashSet<>(); + for (ISplit split : changedSplits) { + splitIds.add(split.getQueueId()); + } + addNewSplit(splitIds); + for (ISplit split : changedSplits) { + ISplitReader reader = createSplitReader(split); + reader.open(split); + reader.seek(loadSplitOffset(split)); + splitReaders.put(split.getQueueId(), reader); + this.ownerSplits.put(split.getQueueId(), split); +// logger.info("start next"); +// Thread thread = new Thread(new Runnable() { +// +// thread.setName("reader-task-" + reader.getSplit().getQueueId()); +// thread.start(); + } + + } + + @Override + public String loadSplitOffset(ISplit split) { + String offset = null; + CheckPoint<String> checkPoint = checkPointManager.recover(this, split); + if (checkPoint != null) { + offset = JSON.parseObject(checkPoint.getData()).getString("offset"); + } + return offset; + } + + protected abstract ISplitReader createSplitReader(ISplit split); + + protected void doSplitRelease(List<ISplit> changedSplits) { + boolean success = balance.getRemoveSplitLock(); + if (!success) { + return; + } + try { + List<SplitCloseFuture> closeFutures = new ArrayList<>(); + for (ISplit split : changedSplits) { + ISplitReader reader = this.splitReaders.get(split.getQueueId()); + if (reader == null) { + continue; + } + SplitCloseFuture future = reader.close(); + closeFutures.add(future); + } + for (SplitCloseFuture future : closeFutures) { + try { + if(!future.isDone()){ + future.get(); + } + this.splitReaders.remove(future.getSplit().getQueueId()); + this.ownerSplits.remove(future.getSplit().getQueueId()); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + + } finally { + balance.unLockRemoveSplitLock(); + } + + } + + + protected class ReaderRunner implements Runnable{ + long mLastCheckTime = System.currentTimeMillis(); + protected ISplit split; + protected ISplitReader reader; + + public ReaderRunner(ISplit split,ISplitReader reader){ + this.split=split; + this.reader=reader; + } + + @Override + public void run() { + logger.info("start running"); + if (reader.isInterrupt() == false) { + if (reader.next()) { + List<PullMessage> messages = reader.getMessage(); + if (messages != null) { + for (PullMessage pullMessage : messages) { + String queueId = split.getQueueId(); + String offset = pullMessage.getOffsetStr(); + JSONObject msg = createJson(pullMessage.getMessage()); + Message message = createMessage(msg, queueId, offset, false); + message.getHeader().setOffsetIsLong(pullMessage.getMessageOffset().isLongOfMainOffset()); + executeMessage(message); + } + } + reader.notifyAll(); + } + long curTime = System.currentTimeMillis(); + if (curTime - mLastCheckTime > getCheckpointTime()) { + sendCheckpoint(reader.getSplit().getQueueId()); + mLastCheckTime = curTime; + } + + + }else { + Set<String> removeSplits = new HashSet<>(); + removeSplits.add(reader.getSplit().getQueueId()); + removeSplit(removeSplits); + balance.unlockSplit(split); + reader.close(); + synchronized (reader) { + reader.notifyAll(); + } + } + + } + + } + + @Override + public boolean supportNewSplitFind() { + return true; + } + + @Override + public boolean supportRemoveSplitFind() { + return true; + } + + @Override + public boolean supportOffsetRest() { + return true; + } + + @Override + public Long getPullIntervalMs() { + return pullIntervalMs; + } + + public String getBalanceName() { + return balanceName; + } + + public void setBalanceName(String balanceName) { + this.balanceName = balanceName; + } + + public int getBalanceTimeSecond() { + return balanceTimeSecond; + } + + public void setBalanceTimeSecond(int balanceTimeSecond) { + this.balanceTimeSecond = balanceTimeSecond; + } + + public void setPullIntervalMs(long pullIntervalMs) { + this.pullIntervalMs = pullIntervalMs; + } + + @Override + public List<ISplit> ownerSplits() { + return new ArrayList(ownerSplits.values()); + } + +} \ No newline at end of file diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java new file mode 100644 index 00000000..561b48f2 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +import com.alibaba.fastjson.JSONObject; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.channel.source.AbstractSource; +import org.apache.rocketmq.streams.common.channel.source.ISource; +import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.context.Message; +import org.apache.rocketmq.streams.common.metadata.MetaDataUtils; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.ThreadUtil; +import org.apache.rocketmq.streams.connectors.IBoundedSource; +import org.apache.rocketmq.streams.connectors.model.ReaderStatus; +import org.apache.rocketmq.streams.connectors.reader.ISplitReader; +import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule; +import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter; +import org.apache.rocketmq.streams.db.CycleSplit; + +/** + * @description + */ +public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable { + + private static final long serialVersionUID = 6840988298037061128L; + private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class); + + Map<String, Boolean> initReaderMap = new ConcurrentHashMap<>(); + CycleSchedule.Cycle cycle; + transient AtomicInteger size = new AtomicInteger(0); + + public CycleDynamicMultipleDBScanSource() { + super(); + } + + public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) { + super(); + this.cycle = cycle; + } + + public AtomicInteger getSize() { + return size; + } + + public void setSize(AtomicInteger size) { + this.size = size; + } + + /** + * @return + */ + //todo + @Override + public synchronized List<ISplit> fetchAllSplits() { + + if (this.filter == null) { + filter = new CycleScheduleFilter(cycle.getAllPattern()); + } + + //如果还是当前周期, 已经完成全部分区的加载, 则不在加载 + if (size.get() == cycle.getCycleCount()) { + return splits; + } + String sourceName = createKey(this); + List<String> tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%"); + + logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray()))); + Iterator<String> it = tableNames.iterator(); + while (it.hasNext()) { + String s = it.next(); + String suffix = s.replace(logicTableName + "_", ""); + if (filter.filter(sourceName, logicTableName, suffix)) { + logger.info(String.format("filter add %s", s)); + CycleSplit split = new CycleSplit(); + split.setLogicTableName(logicTableName); + split.setSuffix(suffix); + split.setCyclePeriod(cycle.getCycleDateStr()); + String splitId = split.getQueueId(); + if (initReaderMap.get(splitId) == null) { + initReaderMap.put(splitId, false); + splits.add(split); + size.incrementAndGet(); + } + } else { + logger.info(String.format("filter remove %s", s)); + it.remove(); + } + } + + this.tableNames = tableNames; + return splits; + } + + public Map<String, Boolean> getInitReaderMap() { + return initReaderMap; + } + + public void setInitReaderMap(Map<String, Boolean> initReaderMap) { + this.initReaderMap = initReaderMap; + } + + @Override + public void finish() { + super.finish(); + for (Map.Entry<String, Boolean> entry : initReaderMap.entrySet()) { + String key = entry.getKey(); + Boolean value = entry.getValue(); + if (value == false) { + logger.error(String.format("split[%s] reader is not finish, exit with error. ", key)); + } + } + this.initReaderMap.clear(); + this.initReaderMap = null; + splits.clear(); + splits = null; + } + + @Override + public boolean isFinished() { + List<ReaderStatus> readerStatuses = ReaderStatus.queryReaderStatusListBySourceName(createKey(this)); + if (readerStatuses == null) { + return false; + } + return readerStatuses.size() == size.get(); + } + + @Override + protected ISplitReader createSplitReader(ISplit iSplit) { + return super.createSplitReader(iSplit); + } + + private void sendChangeTableNameMessage() { + logger.info(String.format("start send change table name message.")); + ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage(); + changeTableNameMessage.setScheduleCycle(cycle.getCycleDateStr()); + Message message = createMessage(new JSONObject(), null, null, false); + message.setSystemMessage(changeTableNameMessage); + message.getHeader().setSystemMessage(true); + executeMessage(message); + logger.info(String.format("finish send change table name message.")); + } + + @Override + public synchronized void boundedFinishedCallBack(ISplit iSplit) { + this.initReaderMap.put(iSplit.getQueueId(), true); + logger.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId())); + if (statusCheckerStart.compareAndSet(false, true)) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!isFinished()) { + ThreadUtil.sleep(3 * 1000); + } + logger.info(String.format("source will be closed.")); + sendChangeTableNameMessage(); //下发修改name的消息 + ThreadUtil.sleep(1 * 1000); + finish(); + } + + }); + thread.setName(createKey(this) + "_callback"); + thread.start(); + } + } + + public CycleSchedule.Cycle getCycle() { + return cycle; + } + + public void setCycle(CycleSchedule.Cycle cycle) { + this.cycle = cycle; + } + + @Override + public String createCheckPointName() { + return super.createCheckPointName(); + } + + public synchronized int getTotalReader() { + return size.get(); + } + + public static String createKey(ISource iSource) { + AbstractSource source = (AbstractSource) iSource; + CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle(); + return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getConfigureName(), source.getTopic(), cycle.getCycleDateStr()); + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java new file mode 100644 index 00000000..ea2a118b --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.metadata.MetaDataUtils; +import org.apache.rocketmq.streams.connectors.reader.DBScanReader; +import org.apache.rocketmq.streams.connectors.reader.ISplitReader; +import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFilter; +import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter; +import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit; + +/** + * @description DynamicMultipleDBScanSource + */ +public class DynamicMultipleDBScanSource extends AbstractPullSource implements Serializable { + + private static final long serialVersionUID = 3987103552547019739L; + private static final Log logger = LogFactory.getLog(DynamicMultipleDBScanSource.class); + public static final int DEFAULT_BATCH_SIZE = 50; + public static final int MAX_BATCH_SIZE = 100; + + String url; + String userName; + String password; + String logicTableName; + String suffix; + int batchSize; + List<String> tableNames; + List<ISplit> splits; + transient volatile AtomicBoolean statusCheckerStart = new AtomicBoolean(false); + + //todo + transient PatternFilter filter; + + public DynamicMultipleDBScanSource() { + splits = new ArrayList<>(); + } + + @Override + protected boolean initConfigurable() { + setTopic(logicTableName); + return super.initConfigurable(); + } + + @Override + protected boolean isNotDataSplit(String queueId) { + return tableNames.contains(queueId); + } + + @Override + protected ISplitReader createSplitReader(ISplit split) { + + DBScanReader reader = new DBScanReader(); + reader.setISplit(split); + reader.setUrl(url); + reader.setUserName(userName); + reader.setPassword(password); + reader.setTableName(String.valueOf(split.getQueue())); + int local = batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize; + local = local > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : local; + reader.setBatchSize(local); + reader.setISource(this); + logger.info(String.format("create reader for split %s", split.getQueueId())); + return reader; + } + + @Override + public List<ISplit> fetchAllSplits() { + + if (filter == null) { + filter = new DataFormatPatternFilter(); + } + +// String sourceName = createKey(this); + + tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%"); + + logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray()))); + + for (String s : tableNames) { + String suffix = s.replace(logicTableName + "_", ""); + if (filter.filter(null, logicTableName, suffix)) { + logger.info(String.format("filter add %s", s)); + DynamicMultipleDBSplit split = new DynamicMultipleDBSplit(); + split.setLogicTableName(logicTableName); + split.setSuffix(suffix); + splits.add(split); + } else { + logger.info(String.format("filter remove %s", s)); + } + + } + return splits; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getLogicTableName() { + return logicTableName; + } + + public void setLogicTableName(String logicTableName) { + this.logicTableName = logicTableName; + } + + public String getSuffix() { + return suffix; + } + + public void setSuffix(String suffix) { + this.suffix = suffix; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public List<String> getTableNames() { + return tableNames; + } + + public void setTableNames(List<String> tableNames) { + this.tableNames = tableNames; + } + + public List<ISplit> getSplits() { + return splits; + } + + public void setSplits(List<ISplit> splits) { + this.splits = splits; + } + + public PatternFilter getFilter() { + return filter; + } + + public void setFilter(PatternFilter filter) { + this.filter = filter; + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java new file mode 100644 index 00000000..6733911d --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.streams.common.channel.source.ISource; +import org.apache.rocketmq.streams.common.channel.split.ISplit; + +/** + * poll message,need balance + */ +public interface IPullSource<T extends ISource> extends ISource<T> { + + /** + * 拥有的分片格式 + * + * @return + */ + Collection<ISplit> ownerSplits(); + + /** + * get all split for the source + * + * @return + */ + List<ISplit> fetchAllSplits(); + + /** + * get all split for the source + * + * @return + */ + Map<String, ISplit> getAllSplitMap(); + + Long getPullIntervalMs(); + + /** + * get cusor from store + * + * @return + */ + String loadSplitOffset(ISplit split); + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java new file mode 100644 index 00000000..d82ba32a --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +import com.alibaba.fastjson.JSON; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.checkpoint.CheckPoint; +import org.apache.rocketmq.streams.common.topology.ChainPipeline; +import org.apache.rocketmq.streams.common.topology.model.Pipeline; +import org.apache.rocketmq.streams.common.topology.task.TaskAssigner; +import org.apache.rocketmq.streams.common.utils.DipperThreadLocalUtil; +import org.apache.rocketmq.streams.common.utils.RuntimeUtil; +import org.apache.rocketmq.streams.connectors.model.PullMessage; +import org.apache.rocketmq.streams.connectors.reader.ISplitReader; +import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture; + +public class MutilBatchTaskSource extends AbstractPullSource { + + @Override protected ISplitReader createSplitReader(ISplit split) { + return new ISplitReader() { + protected transient ISplit split; + protected boolean isInterrupt; + protected boolean isClose; + protected transient AtomicLong offsetGenerator=new AtomicLong(1000000000); + @Override public void open(ISplit split) { + this.split=split; + } + + @Override public boolean next() { + return true; + } + + @Override public List<PullMessage> getMessage() { + PipelineSplit pipelineSplit=(PipelineSplit)split; + pipelineSplit.getQueue().startChannel(); + return null; + } + + @Override public SplitCloseFuture close() { + isClose=true; + return new SplitCloseFuture(this,split); + } + + @Override public void seek(String cursor) { + + } + + @Override public String getProgress() { + return RuntimeUtil.getDipperInstanceId()+"_"+offsetGenerator.incrementAndGet(); + } + + @Override public long getDelay() { + return 0; + } + + @Override public long getFetchedDelay() { + return getPullIntervalMs(); + } + + @Override public boolean isClose() { + return isClose; + } + + @Override public ISplit getSplit() { + return split; + } + + @Override public boolean isInterrupt() { + return isInterrupt; + } + + @Override public boolean interrupt() { + isInterrupt=true; + return isInterrupt; + } + }; + } + + @Override protected boolean isNotDataSplit(String queueId) { + return false; + } + + @Override public List<ISplit> fetchAllSplits() { + + List<TaskAssigner> taskAssigners = configurableService.queryConfigurableByType(TaskAssigner.TYPE); + if (taskAssigners == null) { + return null; + } + String taskName = getConfigureName(); + List<ISplit> splits=new ArrayList<>(); + for (TaskAssigner taskAssigner : taskAssigners) { + if (!taskName.equals(taskAssigner.getTaskName())) { + continue; + } + String pipelineName = taskAssigner.getPipelineName(); + if(pipelineName!=null){ + ChainPipeline<?> pipeline = configurableService.queryConfigurable(Pipeline.TYPE, pipelineName); + if (pipeline != null) { + splits.add(new PipelineSplit(pipeline)); + } + } + } + return splits; + } + + + protected class PipelineSplit implements ISplit<PipelineSplit, ChainPipeline>{ + protected ChainPipeline chainPipeline; + public PipelineSplit(ChainPipeline chainPipeline){ + this.chainPipeline=chainPipeline; + } + + @Override public String getQueueId() { + return chainPipeline.getConfigureName(); + } + + @Override public ChainPipeline getQueue() { + return chainPipeline; + } + + @Override public int compareTo(PipelineSplit o) { + return chainPipeline.getConfigureName().compareTo(o.getQueueId()); + } + + @Override public String toJson() { + return chainPipeline.toJson(); + } + + @Override public void toObject(String jsonString) { + ChainPipeline pipeline=new ChainPipeline(); + pipeline.toObject(jsonString); + this.chainPipeline=pipeline; + } + } + + @Override + public String loadSplitOffset(ISplit split) { + return null; + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java new file mode 100644 index 00000000..c0da5b6e --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source; + +/** + * i个消息队列的实例,一个实例i个 + */ +public class SourceInstance { + protected String sourceInstanceId; + + + public SourceInstance(String sourceInstanceId){ + this.sourceInstanceId=sourceInstanceId; + } + + public String getSourceInstanceId() { + return sourceInstanceId; + } + + public void setSourceInstanceId(String sourceInstanceId) { + this.sourceInstanceId = sourceInstanceId; + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java new file mode 100644 index 00000000..0d5368a7 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; + +/** + * @description + */ +public abstract class AbstractPatternFilter implements PatternFilter, Serializable { + + private static final long serialVersionUID = 6500945777421871431L; + + PatternFilter next; + + public abstract boolean filter(String sourceName, String logicTableName, String tableName); + + + @Override + public PatternFilter setNext(PatternFilter filter) { + this.next = filter; + return this; + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java new file mode 100644 index 00000000..c06de98d --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.connectors.model.ReaderStatus; + +/** + * @description 过滤掉已经完成的reader + */ +@Deprecated +public class BoundedPatternFilter extends AbstractPatternFilter implements Serializable { + + static final Log logger = LogFactory.getLog(BoundedPatternFilter.class); + + @Override + public boolean filter(String sourceName, String logicTableName, String tableName) { + + ReaderStatus readerStatus = ReaderStatus.queryReaderStatusByUK(sourceName, logicTableName + "_" + tableName); + if (readerStatus != null) { + logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s. ", sourceName, logicTableName, tableName)); + logger.info(String.format("query result %s", readerStatus.toString())); + return true; + } + if (next == null) { + return false; + } + return next.filter(sourceName, logicTableName, tableName); + } + + @Override + public PatternFilter setNext(PatternFilter filter) { + super.setNext(filter); + return this; + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java new file mode 100644 index 00000000..3a0193f3 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * @description 用来做分区选取 + */ +public class CyclePatternFilter extends AbstractPatternFilter implements Serializable { + + private static final long serialVersionUID = -5151597286296228754L; + + public static final int INIT_CYCLE_VERSION = 0; + + CyclePeriod cyclePeriod; + + Date curCycleDateTime; //当前调度周期时间 + + long cycleId; + + String firstStartTime; //当前最小时间 + + List<String> allPatterns; + + String expression; + + boolean isInit; + + //历史数据读取时使用,表示比起当前相差多少个调度周期 + final long cycleDiff; + + //todo expr解析 + public CyclePatternFilter(String expr, Date date) throws ParseException { + expression = expr; + cycleId = INIT_CYCLE_VERSION; + cyclePeriod = CyclePeriod.getInstance(expression); + curCycleDateTime = calCycleDateTime(date); + allPatterns = new ArrayList<>(); + isInit = true; + if(cyclePeriod.isHistory){ + Date tmp = cyclePeriod.getHisDate(); + cycleDiff = curCycleDateTime.getTime()/1000 * 1000 - tmp.getTime()/1000*1000; + }else{ + cycleDiff = 0; + } + } + + + /** + * + * @return 返回date格式的调度周期时间 + */ + private Date calCycleDateTime(Date date){ + return cyclePeriod.format(date); + } + + private long calCycle(Date date){ + Date tmp = calCycleDateTime(date); + if(tmp.getTime()/1000 == curCycleDateTime.getTime()/1000){ + return cycleId; + } + return nextCycle(tmp); + } + + private long nextCycle(Date date){ + curCycleDateTime = date; + cycleId++; + calAllPattern(); + return cycleId; + } + + private void calAllPattern(){ + allPatterns.clear(); + for(int i = 1; i <= cyclePeriod.getCycle(); i++){ + long d = (curCycleDateTime.getTime()/1000)*1000 - i * cyclePeriod.getInterval() - cycleDiff; + String s = cyclePeriod.getDateFormat().format(new Date(d)); + allPatterns.add(s); + } + firstStartTime = allPatterns.get(allPatterns.size() - 1); + } + + public boolean isNextCycle(Date date){ + if(isInit){ + isInit = false; + calAllPattern(); + return true; + } + long tmp = cycleId; + return calCycle(date) > tmp; + } + + public List<String> getAllPatterns() { + return allPatterns; + } + + public long getCycleId() { + return cycleId; + } + + public Date getCurCycleDateTime(){ + return curCycleDateTime; + } + + public String getCurCycleDateTimeStr(){ + return cyclePeriod.getDateFormat().format(curCycleDateTime); + } + + public long getCycleDiff() { + return cycleDiff; + } + + public long getCyclePeriodDiff(){ + return cycleDiff/cyclePeriod.getInterval(); + } + + public int getCycle(){ + return cyclePeriod.getCycle(); + } + + public String getFirstStartTime() { + return firstStartTime; + } + + @Override + public boolean filter(String sourceName, String logicTableName, String tableName) { + return allPatterns.contains(tableName); + } + + + + public static void main(String[] args) throws ParseException { + + CyclePatternFilter cycle = new CyclePatternFilter("yyyyMMddHHmm - 15m", new Date()); + System.out.println(cycle); + + System.out.println(cycle.filter(null, null, "202109131650")); + System.out.println(cycle.filter(null, null, "20210902000000")); + System.out.println(cycle.filter(null, null, "20210908000000")); + System.out.println(cycle.filter(null, null, "20210910000000")); + System.out.println(cycle.filter(null, null, "20210909230000")); + + System.out.println(new SimpleDateFormat("yyyyMMddHH").parse("2021090923")); + System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909230000")); + System.out.println(new SimpleDateFormat("yyyyMMddHHmmss").parse("20210909100000")); + System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909100000")); + + + + + } + + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java new file mode 100644 index 00000000..4e6cdd6a --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @Description + */ +public enum CyclePeriod { + + CYCLE_PERIOD_DATE() { + @Override + void argsParser(String expr) throws ParseException { + super.argsParser(expr); + interval = 24 * 3600 * 1000; + int length = expr.length(); + if (length == 8 && checkFormat(expr, PatternFilter.yyyyMMdd)) { + format = PatternFilter.yyyyMMdd; + } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) { + format = PatternFilter.yyyyMMddHHmmss; + } else { + throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr)); + } + } + + @Override + Date format(Date strDate) { + Date date = new Date(strDate.getTime()); + date.setHours(0); + date.setMinutes(0); + date.setSeconds(0); + return date; + } + + }, + CYCLE_PERIOD_HOUR() { + @Override + void argsParser(String expr) throws ParseException { + super.argsParser(expr); + interval = 3600 * 1000; + + int length = expr.length(); + if (length == 10 && checkFormat(expr, PatternFilter.yyyyMMddHH)) { + format = PatternFilter.yyyyMMddHH; + } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) { + format = PatternFilter.yyyyMMddHHmmss; + } else { + throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr)); + } + } + + @Override + Date format(Date strDate) { + Date date = new Date(strDate.getTime()); + date.setMinutes(0); + date.setSeconds(0); + return date; + } + + }, + CYCLE_PERIOD_MINUTE() { + @Override + void argsParser(String expr) throws ParseException { + super.argsParser(expr); + interval = 60 * 1000; + int length = expr.length(); + if (length == 12 && checkFormat(expr, PatternFilter.yyyyMMddHHmm)) { + format = PatternFilter.yyyyMMddHHmm; + } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) { + format = PatternFilter.yyyyMMddHHmmss; + } else { + throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr)); + } + } + + @Override + Date format(Date strDate) { + Date date = new Date(strDate.getTime()); + date.setSeconds(0); + return date; + } + + }; + + boolean isHistory = false; + + long interval; + + int cycle; + + String format; + + String hisDateString; + + static final Log logger = LogFactory.getLog(CyclePeriod.class); + + void argsParser(String expr) throws ParseException { + if (expr.matches("^\\d+$")) { + isHistory = true; + hisDateString = expr; + } + } + + Date format(Date strDate) { + throw new RuntimeException(String.format("unsupported type.", strDate)); + } + + /** + * expr可能是yyyymmdd 或者 20210917 + * + * @param expr + * @param format + * @return + */ + final boolean checkFormat(String expr, String format) { + + if (!isHistory) { + return expr.equalsIgnoreCase(format); + } + + try { + new SimpleDateFormat(format).parse(expr); + return true; + } catch (ParseException e) { + logger.error(String.format("error format, expr is %s, format is %s.", expr, format)); + e.printStackTrace(); + return false; + } + } + + public Date getHisDate() throws ParseException { + return getDateFormat().parse(hisDateString); + } + + public SimpleDateFormat getDateFormat() { + return new SimpleDateFormat(format); + } + + public long getInterval() { + return interval; + } + + public boolean isHistory() { + return isHistory; + } + + public void setHistory(boolean history) { + isHistory = history; + } + + public void setInterval(long interval) { + this.interval = interval; + } + + public int getCycle() { + return cycle; + } + + public void setCycle(int cycle) { + this.cycle = cycle; + } + + public void setFormat(String format) { + this.format = format; + } + + public String getFormat() { + return format; + } + + public String getHisDateString() { + return hisDateString; + } + + public void setHisDateString(String hisDateString) { + this.hisDateString = hisDateString; + } + + public static CyclePeriod getInstance(String expression) throws ParseException { + + String[] str = expression.split("\\-"); + assert str.length == 2 : String.format("expression error : %s. ", expression); + String expr = str[0].trim(); + String tmp = str[1].trim().toLowerCase(); + String cycleStr = tmp.substring(0, tmp.length() - 1); + int cycle = Integer.parseInt(cycleStr); + CyclePeriod cyclePeriod = null; + if (tmp.endsWith("d")) { + cyclePeriod = CYCLE_PERIOD_DATE; + } else if (tmp.endsWith("h")) { + cyclePeriod = CYCLE_PERIOD_HOUR; + } else if (tmp.endsWith("m")) { + cyclePeriod = CYCLE_PERIOD_MINUTE; + } else { + new RuntimeException(String.format("unsupported format : %s", expression)); + } + cyclePeriod.argsParser(expr); + cyclePeriod.cycle = cycle; + + return cyclePeriod; + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java new file mode 100644 index 00000000..ba9a2797 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; + +/** + * @description 用来做分区选取 + */ +public class CycleSchedule implements Serializable { + + private static final long serialVersionUID = -5151597286296228754L; + public static final int INIT_CYCLE_VERSION = 0; + private static CycleSchedule INSTANCE; + CyclePeriod cyclePeriod; + AtomicLong cycleId = new AtomicLong(0); + String expression; + boolean isInit; + //历史数据读取时使用,表示比起当前相差多少个调度周期 + final long cycleDiff; + + public CycleSchedule(String expr, Date date) throws ParseException { + Date local = subMs(date); + expression = expr; + cycleId.set(INIT_CYCLE_VERSION); + cyclePeriod = CyclePeriod.getInstance(expression); + isInit = true; + if (cyclePeriod.isHistory) { + Date curCycleDateTime = calCycleDateTime(local); + Date tmp = subMs(cyclePeriod.getHisDate()); + cycleDiff = curCycleDateTime.getTime() - tmp.getTime(); + } else { + cycleDiff = 0; + } + } + + /** + * 去掉毫秒时间戳 + * + * @param date + * @return + */ + private Date subMs(Date date) { + long time = date.getTime() / 1000 * 1000; + return new Date(time); + } + + /** + * @return 返回date格式的调度周期时间 + */ + private Date calCycleDateTime(Date date) { + return cyclePeriod.format(date); + } + + public Cycle nextCycle(Date date) { + Date local = subMs(date); + local = cyclePeriod.format(local); + if (isInit) { + isInit = false; + } else { + cycleId.incrementAndGet(); + } + List<String> ret = calAllPattern(local); + Cycle cycle = new Cycle(); + cycle.setCycleId(cycleId.get()); + cycle.setAllPattern(ret); + cycle.setCycleDateStr(calCycleDateStr(local)); + cycle.setCycleCount(cyclePeriod.getCycle()); + cycle.setCurDateStr(cyclePeriod.getDateFormat().format(local)); + cycle.setCycleDiff(cycleDiff); + return cycle; + } + + private String calCycleDateStr(Date date) { + long d = date.getTime() - cycleDiff; + Date d1 = new Date(d); + return cyclePeriod.getDateFormat().format(d1); + } + + private List<String> calAllPattern(Date date) { + List<String> allPatterns = new ArrayList<>(); + for (int i = 1; i <= cyclePeriod.getCycle(); i++) { + long d = date.getTime() - i * cyclePeriod.getInterval() - cycleDiff; + String s = cyclePeriod.getDateFormat().format(new Date(d)); + allPatterns.add(s); + } + return allPatterns; + } + + public CyclePeriod getCyclePeriod() { + return cyclePeriod; + } + + public void setCyclePeriod(CyclePeriod cyclePeriod) { + this.cyclePeriod = cyclePeriod; + } + + public AtomicLong getCycleId() { + return cycleId; + } + + public void setCycleId(AtomicLong cycleId) { + this.cycleId = cycleId; + } + + public String getExpression() { + return expression; + } + + public void setExpression(String expression) { + this.expression = expression; + } + + public boolean isInit() { + return isInit; + } + + public void setInit(boolean init) { + isInit = init; + } + + public long getCycleDiff() { + return cycleDiff; + } + + public static CycleSchedule getInstance(String expr, Date date) { + if (INSTANCE == null) { + synchronized (CycleSchedule.class) { + if (INSTANCE == null) { + try { + INSTANCE = new CycleSchedule(expr, date); + } catch (ParseException e) { + e.printStackTrace(); + } + } + } + } + return INSTANCE; + } + + public static class Cycle extends BasedConfigurable implements Serializable { + + private static final long serialVersionUID = 4842560538716388622L; + Long cycleId; + List<String> allPattern; + String cycleDateStr; + Integer cycleCount; + String curDateStr; + long cycleDiff; + + public Integer getCycleCount() { + return cycleCount; + } + + public void setCycleCount(Integer cycleCount) { + this.cycleCount = cycleCount; + } + + public Cycle() { + } + + public Long getCycleId() { + return cycleId; + } + + public void setCycleId(Long cycleId) { + this.cycleId = cycleId; + } + + public List<String> getAllPattern() { + return allPattern; + } + + public void setAllPattern(List<String> allPattern) { + this.allPattern = allPattern; + } + + public String getCycleDateStr() { + return cycleDateStr; + } + + public void setCycleDateStr(String cycleDateStr) { + this.cycleDateStr = cycleDateStr; + } + + public String getCurDateStr() { + return curDateStr; + } + + public void setCurDateStr(String curDateStr) { + this.curDateStr = curDateStr; + } + + public long getCycleDiff() { + return cycleDiff; + } + + public void setCycleDiff(long cycleDiff) { + this.cycleDiff = cycleDiff; + } + + @Override + public String toString() { + return "Cycle{" + + "cycleId=" + cycleId + + ", cycleDateStr='" + cycleDateStr + '\'' + + ", cycleCount=" + cycleCount + + ", curDateStr='" + curDateStr + '\'' + + ", cycleDiff=" + cycleDiff + + ", allPattern=" + Arrays.toString(allPattern.toArray()) + + '}'; + } + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java new file mode 100644 index 00000000..507739d8 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; +import java.util.List; + +/** + * @description + */ +public class CycleScheduleFilter extends AbstractPatternFilter implements Serializable { + + List<String> allPattern; + + public CycleScheduleFilter(List<String> allPattern){ + this.allPattern = allPattern; + } + + @Override + public boolean filter(String sourceName, String logicTableName, String tableName) { + return allPattern.contains(tableName); + } +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java new file mode 100644 index 00000000..0cdc0762 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @description + */ +public class DataFormatPatternFilter extends AbstractPatternFilter implements Serializable { + + private static final long serialVersionUID = 3604787588465242642L; + + static final Log logger = LogFactory.getLog(DataFormatPatternFilter.class); + + static final String yyyyMMddHHmmss = "yyyyMMddHHmmss"; + static final String yyyyMMdd = "yyyyMMdd"; + static final String yyyyMMddHH = "yyyyMMddHH"; + + SimpleDateFormat format1 = new SimpleDateFormat(yyyyMMdd); + SimpleDateFormat format2 = new SimpleDateFormat(yyyyMMddHH); + SimpleDateFormat format3 = new SimpleDateFormat(yyyyMMddHHmmss); + + @Override + public boolean filter(String sourceName, String logicTableName, String tableNameSuffix) { + + int len = tableNameSuffix.length(); + boolean isFilter = false; + + switch (len) { + case 8: + try { + format1.parse(tableNameSuffix); + isFilter = true; + } catch (ParseException e) { + e.printStackTrace(); + isFilter = false; + } + break; + case 10: + try { + format2.parse(tableNameSuffix); + isFilter = true; + } catch (ParseException e) { + e.printStackTrace(); + isFilter = false; + } + break; + case 14: + try { + format3.parse(tableNameSuffix); + isFilter = true; + } catch (ParseException e) { + e.printStackTrace(); + isFilter = false; + } + break; + } + + if (isFilter) { + logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s", sourceName, logicTableName, tableNameSuffix)); + return true; + } + if (next != null) { + return next.filter(sourceName, logicTableName, tableNameSuffix); + } + return false; + } + + @Override + public PatternFilter setNext(PatternFilter filter) { + super.setNext(filter); + return this; + } + + public PatternFilter getNext() { + return next; + } + + public static void main(String[] args) { + DataFormatPatternFilter filter = new DataFormatPatternFilter(); +// System.out.println(filter.filter("20200101")); +// System.out.println(filter.filter("2020010101")); +// System.out.println(filter.filter("20200101010101")); + + } + +} diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java new file mode 100644 index 00000000..42365007 --- /dev/null +++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.connectors.source.filter; + +/** + * @description + */ +public interface PatternFilter { + + String yyyyMMddHHmmss = "yyyyMMddHHmmss"; + String yyyyMMdd = "yyyyMMdd"; + String yyyyMMddHH = "yyyyMMddHH"; + String yyyyMMddHHmm = "yyyyMMddHHmm"; + + + /** + * 根据sourceName和tableName判断是否符合 + * @param sourceName + * @param tableName + * @return + */ + boolean filter(String sourceName, String logicTableName, String tableName); + + PatternFilter setNext(PatternFilter filter); + + +} diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java index cf09bf48..4aa86ae4 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java @@ -127,7 +127,7 @@ public class WindowMaxValueProcessor { } String keyPrefix = MapKeyUtil.createKey(name, splitId); - String sql="select * from "+ ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like '"+keyPrefix+"%'"; + String sql = "select * from " + ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" + name + "%' and partition like '%" + splitId + "%'"; List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null, WindowMaxValue.class); if (windowMaxValues == null || windowMaxValues.size() == 0) { return result;
