This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 6a02c94 Downgrading ZK to stable version 3.4.13 (#2473) 6a02c94 is described below commit 6a02c9434fb3dbb821d0ce6766f0ba49f00565e2 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Sep 6 10:58:17 2018 -0700 Downgrading ZK to stable version 3.4.13 (#2473) * Downgrading ZK to stable version 3.4.13 * Added integration test for ZK server downgrade * Fixed ZookeeperClientFactoryImplTest.testZKCreationFailure * Fixed dependencies version in license file * Addressed comments * Fixed jline version in license file * There are 2 jline jars to count --- distribution/server/licenses/LICENSE-JLine.txt | 29 +++++ distribution/server/src/assemble/LICENSE.bin.txt | 43 +++---- pom.xml | 8 +- pulsar-zookeeper-utils/pom.xml | 57 +++++++++ .../pulsar/zookeeper/FileTxnSnapLogWrapper.java | 60 ---------- .../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 2 +- .../pulsar/zookeeper/SerializeUtilsAspect.java | 130 +++++++++++++++++++++ .../zookeeper/ZookeeperBkClientFactoryImpl.java | 7 +- .../zookeeper/ZookeeperClientFactoryImplTest.java | 12 +- .../pulsar/zookeeper/ZooKeeperServerAspect.java | 16 --- .../pulsar/tests/topologies/PulsarCluster.java | 13 +++ .../pulsar/tests/topologies/PulsarClusterSpec.java | 9 +- .../tests/upgrade/PulsarZKDowngradeTest.java | 115 ++++++++++++++++++ .../src/main/resources/zk-3.5-test-data/log.1 | Bin 0 -> 20480 bytes .../src/main/resources/zk-3.5-test-data/log.85 | Bin 0 -> 1024 bytes .../src/main/resources/zk-3.5-test-data/snapshot.0 | Bin 0 -> 424 bytes .../main/resources/zk-3.5-test-data/snapshot.84 | Bin 0 -> 10747 bytes 17 files changed, 394 insertions(+), 107 deletions(-) diff --git a/distribution/server/licenses/LICENSE-JLine.txt b/distribution/server/licenses/LICENSE-JLine.txt new file mode 100644 index 0000000..9a34d43 --- /dev/null +++ b/distribution/server/licenses/LICENSE-JLine.txt @@ -0,0 +1,29 @@ +Redistribution and use in source and binary forms, with or +without modification, are permitted provided that the following +conditions are met: + +Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with +the distribution. + +Neither the name of JLine nor the names of its contributors +may be used to endorse or promote products derived from this +software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY +AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, +OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index d813041..bef4498 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -370,27 +370,27 @@ The Apache Software License, Version 2.0 - org.apache.logging.log4j-log4j-web-2.10.0.jar * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar * BookKeeper - - org.apache.bookkeeper-bookkeeper-common-4.7.1.jar - - org.apache.bookkeeper-bookkeeper-proto-4.7.1.jar - - org.apache.bookkeeper-bookkeeper-server-4.7.1.jar - - org.apache.bookkeeper-circe-checksum-4.7.1.jar - - org.apache.bookkeeper-statelib-4.7.1.jar - - org.apache.bookkeeper-stream-storage-api-4.7.1.jar - - org.apache.bookkeeper-stream-storage-common-4.7.1.jar - - org.apache.bookkeeper-stream-storage-java-client-4.7.1.jar - - org.apache.bookkeeper-stream-storage-java-client-base-4.7.1.jar - - org.apache.bookkeeper-stream-storage-proto-4.7.1.jar - - org.apache.bookkeeper-stream-storage-server-4.7.1.jar - - org.apache.bookkeeper-stream-storage-service-api-4.7.1.jar - - org.apache.bookkeeper-stream-storage-service-impl-4.7.1.jar - - org.apache.bookkeeper.http-http-server-4.7.1.jar - - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.1.jar - - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.1.jar - - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.1.jar - - org.apache.distributedlog-distributedlog-common-4.7.1.jar - - org.apache.distributedlog-distributedlog-core-4.7.1-tests.jar - - org.apache.distributedlog-distributedlog-core-4.7.1.jar - - org.apache.distributedlog-distributedlog-protocol-4.7.1.jar + - org.apache.bookkeeper-bookkeeper-common-4.7.2.jar + - org.apache.bookkeeper-bookkeeper-proto-4.7.2.jar + - org.apache.bookkeeper-bookkeeper-server-4.7.2.jar + - org.apache.bookkeeper-circe-checksum-4.7.2.jar + - org.apache.bookkeeper-statelib-4.7.2.jar + - org.apache.bookkeeper-stream-storage-api-4.7.2.jar + - org.apache.bookkeeper-stream-storage-common-4.7.2.jar + - org.apache.bookkeeper-stream-storage-java-client-4.7.2.jar + - org.apache.bookkeeper-stream-storage-java-client-base-4.7.2.jar + - org.apache.bookkeeper-stream-storage-proto-4.7.2.jar + - org.apache.bookkeeper-stream-storage-server-4.7.2.jar + - org.apache.bookkeeper-stream-storage-service-api-4.7.2.jar + - org.apache.bookkeeper-stream-storage-service-impl-4.7.2.jar + - org.apache.bookkeeper.http-http-server-4.7.2.jar + - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.2.jar + - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.2.jar + - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.2.jar + - org.apache.distributedlog-distributedlog-common-4.7.2.jar + - org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar + - org.apache.distributedlog-distributedlog-core-4.7.2.jar + - org.apache.distributedlog-distributedlog-protocol-4.7.2.jar * LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar * AsyncHttpClient - org.asynchttpclient-async-http-client-2.1.0-alpha26.jar @@ -467,6 +467,7 @@ BSD 3-clause "New" or "Revised" License - com.ea.agentloader-ea-agent-loader-1.0.2.jar -- licenses/LICENSE-EA-Agent-Loader.txt * Google auth library - com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt + * JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt * LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt diff --git a/pom.xml b/pom.xml index e04cce4..6e26511 100644 --- a/pom.xml +++ b/pom.xml @@ -128,8 +128,8 @@ flexible messaging model and an intuitive client API.</description> <!-- apache commons --> <commons-compress.version>1.15</commons-compress.version> - <bookkeeper.version>4.7.1</bookkeeper.version> - <zookeeper.version>3.5.4-beta</zookeeper.version> + <bookkeeper.version>4.7.2</bookkeeper.version> + <zookeeper.version>3.4.13</zookeeper.version> <netty.version>4.1.22.Final</netty.version> <storm.version>1.0.5</storm.version> <jetty.version>9.3.11.v20160721</jetty.version> @@ -1006,6 +1006,7 @@ flexible messaging model and an intuitive client API.</description> <exclude>generated-site/**</exclude> <exclude>.github/*.md</exclude> <exclude>**/.idea/*</exclude> + <exclude>**/zk-3.5-test-data/*</exclude> </excludes> <mapping> <proto>JAVADOC_STYLE</proto> @@ -1108,6 +1109,9 @@ flexible messaging model and an intuitive client API.</description> <exclude>certificate-authority/index.txt</exclude> <exclude>certificate-authority/README.md</exclude> + <!-- Exclude ZK test data file --> + <exclude>**/zk-3.5-test-data/*</exclude> + <!-- Python requirements files --> <exclude>**/requirements.txt</exclude> diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml index 4234feb..6f496b4 100644 --- a/pulsar-zookeeper-utils/pom.xml +++ b/pulsar-zookeeper-utils/pom.xml @@ -73,6 +73,17 @@ </dependency> <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjrt</artifactId> + </dependency> + + <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjweaver</artifactId> + </dependency> + + + <dependency> <groupId>${project.groupId}</groupId> <artifactId>managed-ledger-original</artifactId> <version>${project.parent.version}</version> @@ -102,7 +113,53 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>aspectj-maven-plugin</artifactId> + <configuration> + <complianceLevel>1.8</complianceLevel> + <source>1.8</source> + <target>1.8</target> + <showWeaveInfo>true</showWeaveInfo> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.codehaus.mojo</groupId> + <artifactId>aspectj-maven-plugin</artifactId> + <versionRange>[1.10,)</versionRange> + <goals> + <goal>compile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> </build> </project> diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java deleted file mode 100644 index 45b9b78..0000000 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.zookeeper; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import lombok.extern.slf4j.Slf4j; - -import org.apache.zookeeper.server.DataTree; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; - -@Slf4j -public class FileTxnSnapLogWrapper extends FileTxnSnapLog { - - public FileTxnSnapLogWrapper(FileTxnSnapLog src) throws IOException { - this(src.getDataDir(), src.getSnapDir()); - } - - public FileTxnSnapLogWrapper(File dataDir, File snapDir) throws IOException { - super(dataDir, snapDir); - } - - @Override - public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { - try { - return super.restore(dt, sessions, listener); - } catch (IOException e) { - if ("No snapshot found, but there are log entries. Something is broken!".equals(e.getMessage())) { - log.info("Ignoring exception for missing ZK db"); - // Ignore error when snapshot is not found. This is needed when upgrading ZK from 3.4 to 3.5 - // https://issues.apache.org/jira/browse/ZOOKEEPER-3056 - save(dt, (ConcurrentHashMap<Long, Integer>) sessions); - - /* return a zxid of zero, since we the database is empty */ - return 0; - } else { - throw e; - } - } - } -} diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index f6ab92c..f7923e4 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -154,7 +154,7 @@ public class LocalBookkeeperEnsemble { try { // Allow all commands on ZK control port System.setProperty("zookeeper.4lw.commands.whitelist", "*"); - zks = new ZooKeeperServer(new FileTxnSnapLogWrapper(zkDataDir, zkDataDir)); + zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); serverFactory = new NIOServerCnxnFactory(); serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), maxCC); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java new file mode 100644 index 0000000..e7eb3cc --- /dev/null +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.txn.CreateSessionTxn; +import org.apache.zookeeper.txn.CreateTxn; +import org.apache.zookeeper.txn.CreateTxnV0; +import org.apache.zookeeper.txn.DeleteTxn; +import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.MultiTxn; +import org.apache.zookeeper.txn.SetACLTxn; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnHeader; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; + +@Aspect +public class SerializeUtilsAspect { + @Around("execution(* org.apache.zookeeper.server.util.SerializeUtils.deserializeTxn(..))") + public Record wrapper(ProceedingJoinPoint joinPoint) throws IOException { + byte[] txnBytes = (byte[]) joinPoint.getArgs()[0]; + TxnHeader hdr = (TxnHeader) joinPoint.getArgs()[1]; + + return deserializeTxn(txnBytes, hdr); + } + + private static final int CREATE2 = 15; + private static final int CREATE_CONTAINER = 19; + private static final int DELETE_CONTAINER = 20; + + /** + * Copied `deserializeTxn()` from ZK-3.4.13 + * + * https://github.com/apache/zookeeper/blob/release-3.4.13/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java#L54 + * + * With addition for handling `create2`, `createContainer` and `deleteContainer` transactions when downgrading from + * 3.5.x to 3.4.x + */ + public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr) + throws IOException { + final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes); + InputArchive ia = BinaryInputArchive.getArchive(bais); + + hdr.deserialize(ia, "hdr"); + bais.mark(bais.available()); + Record txn = null; + switch (hdr.getType()) { + case OpCode.createSession: + // This isn't really an error txn; it just has the same + // format. The error represents the timeout + txn = new CreateSessionTxn(); + break; + case OpCode.closeSession: + return null; + case OpCode.create: + case CREATE2: // create2 is treated as a create op in 3.5.x + case CREATE_CONTAINER: // createContainer can be treated as a regular create operation, since ZK 3.4 doens't + // have support for auto cleaning the empty directories + txn = new CreateTxn(); + break; + case OpCode.delete: + case DELETE_CONTAINER: // deleteContainer is treaded as regular delete in 3.5.x + txn = new DeleteTxn(); + break; + case OpCode.setData: + txn = new SetDataTxn(); + break; + case OpCode.setACL: + txn = new SetACLTxn(); + break; + case OpCode.error: + txn = new ErrorTxn(); + break; + case OpCode.multi: + txn = new MultiTxn(); + break; + default: + throw new IOException("Unsupported Txn with type=%d" + hdr.getType()); + } + if (txn != null) { + try { + txn.deserialize(ia, "txn"); + } catch(EOFException e) { + // perhaps this is a V0 Create + if (hdr.getType() == OpCode.create) { + CreateTxn create = (CreateTxn)txn; + bais.reset(); + CreateTxnV0 createv0 = new CreateTxnV0(); + createv0.deserialize(ia, "txn"); + // cool now make it V1. a -1 parentCVersion will + // trigger fixup processing in processTxn + create.setPath(createv0.getPath()); + create.setData(createv0.getData()); + create.setAcl(createv0.getAcl()); + create.setEphemeral(createv0.getEphemeral()); + create.setParentCVersion(-1); + } else { + throw e; + } + } + } + return txn; + } + +} diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java index d18aea9..aac96ce 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java @@ -23,18 +23,19 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@Slf4j public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory { + private static final Logger log = LoggerFactory.getLogger(ZookeeperBkClientFactoryImpl.class); + private final OrderedExecutor executor; public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) { diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java index 74f3970..7331471 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java @@ -20,13 +20,13 @@ package org.apache.pulsar.zookeeper; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.test.PortManager; -import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; -import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; @@ -79,6 +79,12 @@ public class ZookeeperClientFactoryImplTest { ZooKeeperClientFactory zkf = new ZookeeperClientFactoryImpl(); CompletableFuture<ZooKeeper> zkFuture = zkf.create("invalid", SessionType.ReadWrite, (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS); - assertTrue(zkFuture.isCompletedExceptionally()); + + try { + zkFuture.get(3, TimeUnit.SECONDS); + fail("Should have thrown exception"); + } catch (TimeoutException e) { + // Expected + } } } diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java index c7642f3..24919be 100644 --- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java +++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java @@ -20,14 +20,9 @@ package org.apache.pulsar.zookeeper; import io.prometheus.client.Gauge; -import java.util.Arrays; - import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.aspectj.lang.JoinPoint; -import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.After; -import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; @@ -42,17 +37,6 @@ public class ZooKeeperServerAspect { public void zkServerConstructorPointCut() { } - @Around("zkServerConstructorPointCut()") - public void zkServerConstructorBefore(ProceedingJoinPoint joinPoint) throws Throwable { - Object[] args = joinPoint.getArgs(); - if (args[0] instanceof FileTxnSnapLog) { - // Wrap FileTxnSnapLog argument - args[0] = new FileTxnSnapLogWrapper((FileTxnSnapLog)args[0]); - } - - joinPoint.proceed(args); - } - @After("zkServerConstructorPointCut()") public void zkServerConstructor(JoinPoint joinPoint) throws Throwable { // ZooKeeperServer instance was created diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java index 331d7e2..f2151ef 100644 --- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java +++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import com.google.common.collect.Streams; import lombok.Getter; import lombok.extern.slf4j.Slf4j; + import org.apache.pulsar.tests.containers.BKContainer; import org.apache.pulsar.tests.containers.BrokerContainer; import org.apache.pulsar.tests.containers.CSContainer; @@ -44,6 +45,9 @@ import org.apache.pulsar.tests.containers.PulsarContainer; import org.apache.pulsar.tests.containers.WorkerContainer; import org.apache.pulsar.tests.containers.ZKContainer; import org.testcontainers.containers.Container.ExecResult; + +import org.testcontainers.containers.BindMode; + import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -134,6 +138,15 @@ public class PulsarCluster { ) ); + spec.classPathVolumeMounts.entrySet().forEach(e -> { + zkContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE); + proxyContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE); + + bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE)); + brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE)); + workerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE)); + }); + } public String getPlainTextServiceUrl() { diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java index ceb30b5..96133b3 100644 --- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java +++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java @@ -20,12 +20,14 @@ package org.apache.pulsar.tests.topologies; import java.util.Collections; import java.util.Map; +import java.util.TreeMap; + import lombok.Builder; import lombok.Builder.Default; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; -import org.apache.pulsar.tests.containers.ChaosContainer; + import org.testcontainers.containers.GenericContainer; /** @@ -100,4 +102,9 @@ public class PulsarClusterSpec { @Default boolean enableContainerLog = false; + /** + * Provide a map of paths (in the classpath) to mount as volumes inside the containers + */ + @Builder.Default + Map<String, String> classPathVolumeMounts = new TreeMap<>(); } diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java new file mode 100644 index 0000000..6005aa7 --- /dev/null +++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.upgrade; + +import static java.util.stream.Collectors.joining; +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.ImmutableMap; + +import java.util.stream.Stream; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +/** + * Test downgrading ZK from 3.5.x to 3.4.x. This is part of the upgrade from Pulsar 2.1.0 to 2.1.1. + */ +@Slf4j +public class PulsarZKDowngradeTest extends PulsarClusterTestBase { + + protected static final int ENTRIES_PER_LEDGER = 1024; + + @BeforeSuite + @Override + public void setupCluster() throws Exception { + final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) + .filter(s -> s != null && !s.isEmpty()) + .collect(joining("-")); + + PulsarClusterSpec spec = PulsarClusterSpec.builder() + .numBookies(2) + .numBrokers(1) + .clusterName(clusterName) + .classPathVolumeMounts( + ImmutableMap.<String, String> builder() + .put("zk-3.5-test-data", "/pulsar/data/zookeeper/version-2/version-2") + .build()) + .build(); + + log.info("Setting up cluster {} with {} bookies, {} brokers", + spec.clusterName(), spec.numBookies(), spec.numBrokers()); + + pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + log.info("Cluster {} is setup", spec.clusterName()); + } + + @AfterSuite + @Override + public void tearDownCluster() { + super.tearDownCluster(); + } + + @Test(dataProvider = "ServiceUrlAndTopics") + public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { + String topicName = generateTopicName("testpubconsume", isPersistent); + + int numMessages = 10; + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + + @Cleanup + Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("my-sub") + .subscribe(); + + @Cleanup + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topicName) + .create(); + + for (int i = 0; i < numMessages; i++) { + producer.send("smoke-message-" + i); + } + + for (int i = 0; i < numMessages; i++) { + Message<String> m = consumer.receive(); + assertEquals("smoke-message-" + i, m.getValue()); + } + + } +} diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1 new file mode 100644 index 0000000..6d5ae52 Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1 differ diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85 new file mode 100644 index 0000000..4ecb63b Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85 differ diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0 new file mode 100644 index 0000000..3e6deee Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0 differ diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84 new file mode 100644 index 0000000..3520b95 Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84 differ