Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package librdkafka for openSUSE:Factory checked in at 2022-01-20 00:11:52 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/librdkafka (Old) and /work/SRC/openSUSE:Factory/.librdkafka.new.1892 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "librdkafka" Thu Jan 20 00:11:52 2022 rev:13 rq:946807 version:1.8.2 Changes: -------- --- /work/SRC/openSUSE:Factory/librdkafka/librdkafka.changes 2021-10-31 22:56:11.927723914 +0100 +++ /work/SRC/openSUSE:Factory/.librdkafka.new.1892/librdkafka.changes 2022-01-20 00:12:03.470565756 +0100 @@ -1,0 +2,10 @@ +Tue Jan 4 22:51:56 UTC 2022 - Dirk M??ller <dmuel...@suse.com> + +- update to 1.8.2: + * Added ssl.ca.pem to add CA certificate by PEM string + * Upon quick repeated leader changes the transactional producer could receive + an OUT_OF_ORDER_SEQUENCE error from the broker + * The transactional producer could stall during a transaction if the transaction + coordinator changed + +------------------------------------------------------------------- Old: ---- v1.8.0.tar.gz New: ---- v1.8.2.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ librdkafka.spec ++++++ --- /var/tmp/diff_new_pack.58uk2n/_old 2022-01-20 00:12:07.390568921 +0100 +++ /var/tmp/diff_new_pack.58uk2n/_new 2022-01-20 00:12:07.394568924 +0100 @@ -1,7 +1,7 @@ # # spec file for package librdkafka # -# Copyright (c) 2021 SUSE LLC +# Copyright (c) 2022 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -23,7 +23,7 @@ %define _lto_cflags %{nil} %endif Name: librdkafka -Version: 1.8.0 +Version: 1.8.2 Release: 0 Summary: The Apache Kafka C/C++ library License: BSD-2-Clause ++++++ v1.8.0.tar.gz -> v1.8.2.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/.travis.yml new/librdkafka-1.8.2/.travis.yml --- old/librdkafka-1.8.0/.travis.yml 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/.travis.yml 2021-10-11 22:15:44.000000000 +0200 @@ -1,5 +1,5 @@ language: c -dist: trusty +dist: xenial cache: ccache addons: @@ -32,7 +32,7 @@ compiler: gcc env: ADDITIONAL_BUILDS="centos" SKIP_MAKE=y - - name: "Linux clang: +alpine +manylinux" + - name: "Linux clang: +alpine +manylinux +werror" os: linux compiler: clang env: ADDITIONAL_BUILDS="alpine manylinux2010_x86_64" LINKAGE=std @@ -51,19 +51,17 @@ if: tag IS PRESENT os: osx compiler: gcc - env: LINKAGE=std + env: LINKAGE=std HOMEBREW_NO_AUTO_UPDATE=1 before_script: - - ./configure --install-deps --disable-lz4-ext --prefix="$PWD/dest" --enable-werror --enable-strip + - ./configure --install-deps --source-deps-only --disable-lz4-ext --prefix="$PWD/dest" --enable-strip - name: "OSX clang: +static" + if: tag IS PRESENT os: osx - # Use an older image to disable syslog and for broader compatibility - # with old and new osx versions. - osx_image: xcode9.2 compiler: clang env: LINKAGE=static HOMEBREW_NO_AUTO_UPDATE=1 before_script: - - ./configure --install-deps --disable-lz4-ext --prefix="$PWD/dest" --enable-static --disable-syslog --enable-strip + - ./configure --install-deps --source-deps-only --disable-lz4-ext --prefix="$PWD/dest" --enable-static --enable-strip - name: "Windows MinGW-w64 Dynamic" if: tag IS PRESENT @@ -110,6 +108,7 @@ - ./configure --disable-gssapi --install-deps --source-deps-only --enable-static --disable-lz4-ext --prefix="$PWD/dest" --enable-strip - name: "Linux GCC s390x: +devel" + if: tag IS PRESENT os: linux arch: s390x dist: bionic diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/CHANGELOG.md new/librdkafka-1.8.2/CHANGELOG.md --- old/librdkafka-1.8.0/CHANGELOG.md 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/CHANGELOG.md 2021-10-11 22:15:44.000000000 +0200 @@ -1,3 +1,58 @@ +# librdkafka v1.8.2 + +librdkafka v1.8.2 is a maintenance release. + +## Enhancements + + * Added `ssl.ca.pem` to add CA certificate by PEM string. (#2380) + * Prebuilt binaries for Mac OSX now contain statically linked OpenSSL v1.1.1l. + Previously the OpenSSL version was either v1.1.1 or v1.0.2 depending on + build type. + +## Fixes + + * The `librdkafka.redist` 1.8.0 package had two flaws: + - the linux-arm64 .so build was a linux-x64 build. + - the included Windows MSVC 140 runtimes for x64 were infact x86. + The release script has been updated to verify the architectures of + provided artifacts to avoid this happening in the future. + * Prebuilt binaries for Mac OSX Sierra (10.12) and older are no longer provided. + This affects [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go). + * Some of the prebuilt binaries for Linux were built on Ubuntu 14.04, + these builds are now performed on Ubuntu 16.04 instead. + This may affect users on ancient Linux distributions. + * It was not possible to configure `ssl.ca.location` on OSX, the property + would automatically revert back to `probe` (default value). + This regression was introduced in v1.8.0. (#3566) + * librdkafka's internal timers would not start if the timeout was set to 0, + which would result in some timeout operations not being enforced correctly, + e.g., the transactional producer API timeouts. + These timers are now started with a timeout of 1 microsecond. + +### Transactional producer fixes + + * Upon quick repeated leader changes the transactional producer could receive + an `OUT_OF_ORDER_SEQUENCE` error from the broker, which triggered an + Epoch bump on the producer resulting in an InitProducerIdRequest being sent + to the transaction coordinator in the middle of a transaction. + This request would start a new transaction on the coordinator, but the + producer would still think (erroneously) it was in current transaction. + Any messages produced in the current transaction prior to this event would + be silently lost when the application committed the transaction, leading + to message loss. + This has been fixed by setting the Abortable transaction error state + in the producer. #3575. + * The transactional producer could stall during a transaction if the transaction + coordinator changed while adding offsets to the transaction (send_offsets_to_transaction()). + This stall lasted until the coordinator connection went down, the + transaction timed out, transaction was aborted, or messages were produced + to a new partition, whichever came first. #3571. + + + +*Note: there was no v1.8.1 librdkafka release* + + # librdkafka v1.8.0 librdkafka v1.8.0 is a security release: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/CONFIGURATION.md new/librdkafka-1.8.2/CONFIGURATION.md --- old/librdkafka-1.8.0/CONFIGURATION.md 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/CONFIGURATION.md 2021-10-11 22:15:44.000000000 +0200 @@ -68,6 +68,7 @@ ssl.certificate.pem | * | | | low | Client's public key string (PEM format) used for authentication. <br>*Type: string* ssl_certificate | * | | | low | Client's public key as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API* ssl.ca.location | * | | | low | File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `ssl.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string* +ssl.ca.pem | * | | | low | CA certificate string (PEM format) for verifying the broker's key. <br>*Type: string* ssl_ca | * | | | low | CA certificate as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API* ssl.ca.certificate.stores | * | | Root | low | Comma-separated list of Windows Certificate stores to load CA certificates from. Certificates will be loaded in the same order as stores are specified. If no certificates can be loaded from any of the specified stores an error is logged and the OpenSSL library's default CA location is used instead. Store names are typically one or more of: MY, Root, Trust, CA. <br>*Type: string* ssl.crl.location | * | | | low | Path to CRL for verifying broker's certificate validity. <br>*Type: string* diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/Makefile.base new/librdkafka-1.8.2/mklove/Makefile.base --- old/librdkafka-1.8.0/mklove/Makefile.base 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/Makefile.base 2021-10-11 22:15:44.000000000 +0200 @@ -134,7 +134,7 @@ @printf "$(MKL_YELLOW)Creating self-contained static library $@$(MKL_CLR_RESET)\n" ifeq ($(HAS_LIBTOOL_STATIC),y) $(LIBTOOL) -static -o $@ - $(LIBNAME).a $(MKL_STATIC_LIBS) -else # HAS_LIBTOOL_STATIC +else ifeq ($(HAS_GNU_AR),y) (_tmp=$$(mktemp arstaticXXXXXX) ; \ echo "CREATE $@" > $$_tmp ; \ for _f in $(LIBNAME).a $(MKL_STATIC_LIBS) ; do \ @@ -145,7 +145,11 @@ cat $$_tmp ; \ ar -M < $$_tmp || exit 1 ; \ rm $$_tmp) -endif # HAS_LIBTOOL_STATIC +else + for _f in $(LIBNAME).a $(MKL_STATIC_LIBS) ; do \ + ar -r $@ $$_f ; \ + done +endif cp $@ $(LIBNAME)-static-dbg.a # The self-contained static library is always stripped, regardless # of --enable-strip, since otherwise it would become too big. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/configure.base new/librdkafka-1.8.2/mklove/modules/configure.base --- old/librdkafka-1.8.0/mklove/modules/configure.base 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/modules/configure.base 2021-10-11 22:15:44.000000000 +0200 @@ -499,7 +499,9 @@ else mkl_dbg "Source install of $name failed" mkl_check_failed "$iname" "" disable "source installer failed (see $ilog)" - mkl_err "$name source build failed, see $ilog for details. Last 50 lines:" + mkl_err "$name source build failed, see $ilog for details. First 50 and last 50 lines:" + head -50 "$ilog" + echo " .... and last 50 lines ...." tail -50 "$ilog" fi @@ -607,6 +609,41 @@ } +# Apply patch to a source dependency. +# +# Param 1: config name (e.g. libssl) +# Param 2: patch number (optional, else all) +# +# Returns 0 on success or 1 on error. +function mkl_patch { + local name=$1 + local patchnr="$2" + + if [[ -z $patchnr ]]; then + patchnr="????" + fi + + local patchfile= + local cnt=0 + for patchfile in $(echo ${MKLOVE_DIR}/modules/patches/${name}.${patchnr}-*.patch | sort); do + mkl_dbg "$1: applying patch $patchfile" + patch -p1 < $patchfile + local retcode=$? + if [[ $retcode != 0 ]]; then + mkl_err "mkl_patch: $1: failed to apply patch $patchfile: see source dep build log for details" + return 1 + fi + cnt=$(($cnt + 1)) + done + + if [[ $cnt -lt 1 ]]; then + mkl_err "mkl_patch: $1: no patches matchign $patchnr found" + return 1 + fi + + return 0 +} + ########################################################################### # @@ -2358,23 +2395,38 @@ # # Arguments: # url Archive URL -# checksum_type The ${checksum_type}sum tool will be used to verify the checksum. E.g., "sha256". +# shabits The SHA algorithm bit count used to verify the checksum. E.g., "256". # checksum Expected checksum of archive (use "" to not perform check) function mkl_download_archive { local url="$1" - local checksum_tool="${2}sum" + local shabits="$2" local exp_checksum="$3" local tmpfile=$(mktemp _mkltmpXXXXXX) - if ! curl -fLs -o "$tmpfile" "$url" ; then - rm -f "$tmpfile" - echo -e "ERROR: Download of $url failed" 1>&2 - return 1 + # Try both wget and curl + if ! wget -nv -O "$tmpfile" "$url" ; then + if ! curl -fLsS -o "$tmpfile" "$url" ; then + rm -f "$tmpfile" + echo -e "ERROR: Download of $url failed" 1>&2 + return 1 + fi fi if [[ -n $exp_checksum ]]; then # Verify checksum + + local checksum_tool="" + + # OSX has shasum by default, on Linux it is typically in + # some Perl package that may or may not be installed. + if $(which shasum >/dev/null 2>&1); then + checksum_tool="shasum -b -a ${shabits}" + else + # shaXsum is available in Linux coreutils + checksum_tool="sha${shabits}sum" + fi + local checksum=$($checksum_tool "$tmpfile" | cut -d' ' -f1) if [[ $? -ne 0 ]]; then rm -f "$tmpfile" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/configure.cc new/librdkafka-1.8.2/mklove/modules/configure.cc --- old/librdkafka-1.8.0/mklove/modules/configure.cc 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/modules/configure.cc 2021-10-11 22:15:44.000000000 +0200 @@ -158,6 +158,11 @@ mkl_mkvar_set staticlinking HAS_LIBTOOL_STATIC y fi fi + + # Check for GNU ar (which has the -M option) + mkl_meta_set "gnuar" "name" "GNU ar" + mkl_command_check "gnuar" "HAS_GNU_AR" disable \ + "ar -V 2>/dev/null | grep -q GNU" } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/configure.libssl new/librdkafka-1.8.2/mklove/modules/configure.libssl --- old/librdkafka-1.8.0/mklove/modules/configure.libssl 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/modules/configure.libssl 2021-10-11 22:15:44.000000000 +0200 @@ -26,7 +26,7 @@ *) mkl_err "mklove internal error: invalid value for ENABLE_SSL: $ENABLE_SSL"; exit 1 ;; esac - if [[ $MKL_DISTRO == "osx" ]]; then + if [[ $MKL_SOURCE_DEPS_ONLY != y && $MKL_DISTRO == "osx" ]]; then # Add brew's OpenSSL pkg-config path on OSX # to avoid picking up the outdated system-provided openssl/libcrypto. mkl_env_append PKG_CONFIG_PATH "/usr/local/opt/openssl/lib/pkgconfig" ":" @@ -71,45 +71,53 @@ } -# No source installer on osx: rely on openssl from homebrew -if [[ $MKL_DISTRO != osx ]]; then - # Install libcrypto/libssl from source tarball on linux. # # Param 1: name (libcrypto) # Param 2: install-dir-prefix (e.g., DESTDIR) # Param 2: version (optional) - function libcrypto_install_source { - local name=$1 - local destdir=$2 - local ver=1.1.1l - local checksum="0b7a3e5e59c34827fe0c3a74b7ec8baef302b98fa80088d7f9153aa16fa76bd1" - local url=https://www.openssl.org/source/openssl-${ver}.tar.gz - - local conf_args="--openssldir=/usr/lib/ssl no-shared no-zlib no-deprecated" - if [[ $ver == 1.0.* ]]; then - extra_conf_args="${extra_conf_args} no-krb5" - fi +function libcrypto_install_source { + local name=$1 + local destdir=$2 + local ver=1.1.1l + local checksum="0b7a3e5e59c34827fe0c3a74b7ec8baef302b98fa80088d7f9153aa16fa76bd1" + local url=https://www.openssl.org/source/openssl-${ver}.tar.gz + + local conf_args="--prefix=/usr --openssldir=/usr/lib/ssl no-shared no-zlib no-deprecated" + if [[ $ver == 1.0.* ]]; then + conf_args="${conf_args} no-krb5" + fi + + echo "### Installing $name $ver from source ($url) to $destdir" + if [[ ! -f config ]]; then + echo "### Downloading" + mkl_download_archive "$url" "256" "$checksum" || return 1 + fi - echo "### Installing $name $ver from source ($url) to $destdir" - if [[ ! -f config ]]; then - echo "### Downloading" - mkl_download_archive "$url" "sha256" "$checksum" || return 1 + if [[ $MKL_DISTRO == "osx" ]]; then + # Workaround build issue in 1.1.1l on OSX with older toolchains. + if [[ $ver == 1.1.1l ]]; then + if ! mkl_patch libssl 0000 ; then + return 1 + fi fi - echo "### Configuring" - ./config --prefix="/usr" $conf_args || return $? + # Silence a load of warnings on OSX + conf_args="${conf_args} -Wno-nullability-completeness" + fi - echo "### Building" - make + echo "### Configuring with args $conf_args" + ./config $conf_args || return $? - echo "### Installing to $destdir" - if [[ $ver == 1.0.* ]]; then - make INSTALL_PREFIX="$destdir" install_sw - else - make DESTDIR="$destdir" install - fi + echo "### Building" + make - return $? - } -fi + echo "### Installing to $destdir" + if [[ $ver == 1.0.* ]]; then + make INSTALL_PREFIX="$destdir" install_sw + else + make DESTDIR="$destdir" install + fi + + return $? +} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/configure.libzstd new/librdkafka-1.8.2/mklove/modules/configure.libzstd --- old/librdkafka-1.8.0/mklove/modules/configure.libzstd 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/modules/configure.libzstd 2021-10-11 22:15:44.000000000 +0200 @@ -49,7 +49,7 @@ if [[ ! -f Makefile ]]; then mkl_download_archive \ "https://github.com/facebook/zstd/releases/download/v${ver}/zstd-${ver}.tar.gz" \ - "sha256" \ + "256" \ $checksum || return 1 fi diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/configure.zlib new/librdkafka-1.8.2/mklove/modules/configure.zlib --- old/librdkafka-1.8.0/mklove/modules/configure.zlib 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/mklove/modules/configure.zlib 2021-10-11 22:15:44.000000000 +0200 @@ -49,7 +49,7 @@ if [[ ! -f Makefile ]]; then mkl_download_archive \ "https://zlib.net/zlib-${ver}.tar.gz" \ - "sha256" \ + "256" \ "$checksum" || return 1 fi diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/patches/README.md new/librdkafka-1.8.2/mklove/modules/patches/README.md --- old/librdkafka-1.8.0/mklove/modules/patches/README.md 1970-01-01 01:00:00.000000000 +0100 +++ new/librdkafka-1.8.2/mklove/modules/patches/README.md 2021-10-11 22:15:44.000000000 +0200 @@ -0,0 +1,8 @@ +This directory contains patches to dependencies used by the source installers in configure.* + + +Patch filename format is: +<module>.NNNN-description_of_patch.patch + +Where module is the configure.<module> name, NNNN is the patch apply order, e.g. 0000. + diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/mklove/modules/patches/libssl.0000-osx-rand-include-fix-OpenSSL-PR16409.patch new/librdkafka-1.8.2/mklove/modules/patches/libssl.0000-osx-rand-include-fix-OpenSSL-PR16409.patch --- old/librdkafka-1.8.0/mklove/modules/patches/libssl.0000-osx-rand-include-fix-OpenSSL-PR16409.patch 1970-01-01 01:00:00.000000000 +0100 +++ new/librdkafka-1.8.2/mklove/modules/patches/libssl.0000-osx-rand-include-fix-OpenSSL-PR16409.patch 2021-10-11 22:15:44.000000000 +0200 @@ -0,0 +1,56 @@ +From cef404f1e7a598166cbc2fd2e0048f7e2d752ad5 Mon Sep 17 00:00:00 2001 +From: David Carlier <devne...@gmail.com> +Date: Tue, 24 Aug 2021 22:40:14 +0100 +Subject: [PATCH] Darwin platform allows to build on releases before + Yosemite/ios 8. + +issue #16407 #16408 +--- + crypto/rand/rand_unix.c | 5 +---- + include/crypto/rand.h | 10 ++++++++++ + 2 files changed, 11 insertions(+), 4 deletions(-) + +diff --git a/crypto/rand/rand_unix.c b/crypto/rand/rand_unix.c +index 43f1069d151d..0f4525106af7 100644 +--- a/crypto/rand/rand_unix.c ++++ b/crypto/rand/rand_unix.c +@@ -34,9 +34,6 @@ + #if defined(__OpenBSD__) + # include <sys/param.h> + #endif +-#if defined(__APPLE__) +-# include <CommonCrypto/CommonRandom.h> +-#endif + + #if defined(OPENSSL_SYS_UNIX) || defined(__DJGPP__) + # include <sys/types.h> +@@ -381,7 +378,7 @@ static ssize_t syscall_random(void *buf, size_t buflen) + if (errno != ENOSYS) + return -1; + } +-# elif defined(__APPLE__) ++# elif defined(OPENSSL_APPLE_CRYPTO_RANDOM) + if (CCRandomGenerateBytes(buf, buflen) == kCCSuccess) + return (ssize_t)buflen; + +diff --git a/include/crypto/rand.h b/include/crypto/rand.h +index 5350d3a93119..674f840fd13c 100644 +--- a/include/crypto/rand.h ++++ b/include/crypto/rand.h +@@ -20,6 +20,16 @@ + + # include <openssl/rand.h> + ++# if defined(__APPLE__) && !defined(OPENSSL_NO_APPLE_CRYPTO_RANDOM) ++# include <Availability.h> ++# if (defined(__MAC_OS_X_VERSION_MIN_REQUIRED) && __MAC_OS_X_VERSION_MIN_REQUIRED >= 101000) || \ ++ (defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && __IPHONE_OS_VERSION_MIN_REQUIRED >= 80000) ++# define OPENSSL_APPLE_CRYPTO_RANDOM 1 ++# include <CommonCrypto/CommonCryptoError.h> ++# include <CommonCrypto/CommonRandom.h> ++# endif ++# endif ++ + /* forward declaration */ + typedef struct rand_pool_st RAND_POOL; + diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/packaging/nuget/packaging.py new/librdkafka-1.8.2/packaging/nuget/packaging.py --- old/librdkafka-1.8.0/packaging/nuget/packaging.py 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/packaging/nuget/packaging.py 2021-10-11 22:15:44.000000000 +0200 @@ -17,6 +17,7 @@ from collections import defaultdict import boto3 from zfile import zfile +import magic if sys.version_info[0] < 3: from urllib import unquote @@ -31,6 +32,38 @@ 'i386': 'x86', 'win32': 'x86'}} +# Filemagic arch mapping. +# key is (plat, arch, file_extension), value is a compiled filemagic regex. +# This is used to verify that an artifact has the expected file type. +magic_patterns = { + ('win', 'x64', '.dll'): re.compile('PE32.*DLL.* x86-64, for MS Windows'), + ('win', 'x86', '.dll'): re.compile('PE32.*DLL.* Intel 80386, for MS Windows'), + ('win', 'x64', '.lib'): re.compile('current ar archive'), + ('win', 'x86', '.lib'): re.compile('current ar archive'), + ('linux', 'x64', '.so'): re.compile('ELF 64.* x86-64'), + ('linux', 'arm64', '.so'): re.compile('ELF 64.* ARM aarch64'), + ('osx', 'x64', '.dylib'): re.compile('Mach-O 64.* x86_64') } + +magic = magic.Magic() + +def magic_mismatch(path, a): + """ Verify that the filemagic for \p path matches for artifact \p a. + Returns True if the magic file info does NOT match. + Returns False if no matching is needed or the magic matches. """ + k = (a.info.get('plat', None), a.info.get('arch', None), + os.path.splitext(path)[1]) + pattern = magic_patterns.get(k, None) + if pattern is None: + return False + + minfo = magic.id_filename(path) + if not pattern.match(minfo): + print(f"Warning: {path} magic \"{minfo}\" does not match expected {pattern} for key {k}") + return True + + return False + + # Collects CI artifacts from S3 storage, downloading them # to a local directory, or collecting already downloaded artifacts from # local directory. @@ -315,8 +348,6 @@ destpath=os.path.join('build', 'native')) self.copy_template('librdkafka.redist.props', destpath='build') - for f in ['../../README.md', '../../CONFIGURATION.md', '../../LICENSES.txt']: - shutil.copy(f, self.stpath) # Generate template tokens for artifacts for a in self.arts.artifacts: @@ -334,6 +365,12 @@ [{'arch': 'x64', 'plat': 'linux', 'lnk': 'std', 'fname_glob': 'librdkafka-gcc.tar.gz'}, './include/librdkafka/rdkafkacpp.h', 'build/native/include/librdkafka/rdkafkacpp.h'], [{'arch': 'x64', 'plat': 'linux', 'lnk': 'std', 'fname_glob': 'librdkafka-gcc.tar.gz'}, './include/librdkafka/rdkafka_mock.h', 'build/native/include/librdkafka/rdkafka_mock.h'], + [{'arch': 'x64', 'plat': 'linux', 'lnk': 'std', 'fname_glob': 'librdkafka-gcc.tar.gz'}, './share/doc/librdkafka/README.md', 'README.md'], + [{'arch': 'x64', 'plat': 'linux', 'lnk': 'std', 'fname_glob': 'librdkafka-gcc.tar.gz'}, './share/doc/librdkafka/CONFIGURATION.md', 'CONFIGURATION.md'], + # The above x64-linux gcc job generates a bad LICENSES.txt file, + # so we use the one from the osx job instead. + [{'arch': 'x64', 'plat': 'osx', 'lnk': 'std', 'fname_glob': 'librdkafka-gcc.tar.gz'}, './share/doc/librdkafka/LICENSES.txt', 'LICENSES.txt'], + # Travis OSX build [{'arch': 'x64', 'plat': 'osx', 'fname_glob': 'librdkafka-clang.tar.gz'}, './lib/librdkafka.dylib', 'runtimes/osx-x64/native/librdkafka.dylib'], # Travis Manylinux build @@ -396,9 +433,14 @@ found = False # Try all matching artifacts until we find the wanted file (member) for a in self.arts.artifacts: + attr_match = True for attr in attributes: if a.info.get(attr, None) != attributes[attr]: - continue + attr_match = False + break + + if not attr_match: + continue if not fnmatch(a.fname, fname_glob): continue @@ -414,6 +456,11 @@ except Exception as e: raise Exception('file not found in archive %s: %s. Files in archive are: %s' % (a.lpath, e, zfile.ZFile(a.lpath).getnames())) + # Check that the file type matches. + if magic_mismatch(outf, a): + os.unlink(outf) + continue + found = True break @@ -436,6 +483,8 @@ """ Verify package """ expect = [ "librdkafka.redist.nuspec", + "README.md", + "CONFIGURATION.md", "LICENSES.txt", "build/librdkafka.redist.props", "build/native/librdkafka.redist.targets", @@ -482,9 +531,9 @@ if len(missing) > 0: print('Missing files in package %s:\n%s' % (path, '\n'.join(missing))) return False - else: - print('OK - %d expected files found' % len(expect)) - return True + + print('OK - %d expected files found' % len(expect)) + return True class StaticPackage (Package): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/packaging/nuget/requirements.txt new/librdkafka-1.8.2/packaging/nuget/requirements.txt --- old/librdkafka-1.8.0/packaging/nuget/requirements.txt 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/packaging/nuget/requirements.txt 2021-10-11 22:15:44.000000000 +0200 @@ -1,2 +1,3 @@ -boto3 -rpmfile +boto3==1.18.45 +rpmfile==1.0.8 +filemagic==1.6 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka.h new/librdkafka-1.8.2/src/rdkafka.h --- old/librdkafka-1.8.0/src/rdkafka.h 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka.h 2021-10-11 22:15:44.000000000 +0200 @@ -158,7 +158,7 @@ * @remark This value should only be used during compile time, * for runtime checks of version use rd_kafka_version() */ -#define RD_KAFKA_VERSION 0x010800ff +#define RD_KAFKA_VERSION 0x010802ff /** * @brief Returns the librdkafka version as integer. @@ -2302,6 +2302,9 @@ * * @remark Private and public keys in PEM format may also be set with the * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. + * + * @remark CA certificate in PEM format may also be set with the + * `ssl.ca.pem` configuration property. */ RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf, @@ -2315,18 +2318,18 @@ * @brief Set callback_data for OpenSSL engine. * * @param conf Configuration object. - * @param callback_data passed to engine callbacks, + * @param callback_data passed to engine callbacks, * e.g. \c ENGINE_load_ssl_client_cert. * - * @remark The \c ssl.engine.location configuration must be set for this + * @remark The \c ssl.engine.location configuration must be set for this * to have affect. * - * @remark The memory pointed to by \p value must remain valid for the - * lifetime of the configuration object and any Kafka clients that + * @remark The memory pointed to by \p value must remain valid for the + * lifetime of the configuration object and any Kafka clients that * use it. */ RD_EXPORT -void rd_kafka_conf_set_engine_callback_data (rd_kafka_conf_t *conf, +void rd_kafka_conf_set_engine_callback_data (rd_kafka_conf_t *conf, void *callback_data); diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_broker.c new/librdkafka-1.8.2/src/rdkafka_broker.c --- old/librdkafka-1.8.0/src/rdkafka_broker.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_broker.c 2021-10-11 22:15:44.000000000 +0200 @@ -3759,6 +3759,7 @@ rd_kafka_idemp_drain_epoch_bump( rkb->rkb_rk, + RD_KAFKA_RESP_ERR__TIMED_OUT, "%d message(s) timed out " "on %s [%"PRId32"]", timeoutcnt, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_conf.c new/librdkafka-1.8.2/src/rdkafka_conf.c --- old/librdkafka-1.8.0/src/rdkafka_conf.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_conf.c 2021-10-11 22:15:44.000000000 +0200 @@ -817,6 +817,11 @@ "path will be used (see `OPENSSLDIR` in `openssl version -a`).", _UNSUPPORTED_SSL }, + { _RK_GLOBAL|_RK_SENSITIVE, "ssl.ca.pem", _RK_C_STR, + _RK(ssl.ca_pem), + "CA certificate string (PEM format) for verifying the broker's key.", + _UNSUPPORTED_SSL + }, { _RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL, _RK(ssl.ca), "CA certificate as set by rd_kafka_conf_set_ssl_cert()", @@ -3703,11 +3708,12 @@ if (conf->ssl.keystore_location && !conf->ssl.keystore_password) return "`ssl.keystore.password` is mandatory when " "`ssl.keystore.location` is set"; - if (conf->ssl.ca && conf->ssl.ca_location) - return "`ssl.ca.location`, and memory-based " + if (conf->ssl.ca && (conf->ssl.ca_location || conf->ssl.ca_pem)) + return "`ssl.ca.location` or `ssl.ca.pem`, and memory-based " "set_ssl_cert(CERT_CA) are mutually exclusive."; #ifdef __APPLE__ - else /* Default ssl.ca.location to 'probe' on OSX */ + else if (!conf->ssl.ca && !conf->ssl.ca_location && !conf->ssl.ca_pem) + /* Default ssl.ca.location to 'probe' on OSX */ rd_kafka_conf_set(conf, "ssl.ca.location", "probe", NULL, 0); #endif #endif diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_conf.h new/librdkafka-1.8.2/src/rdkafka_conf.h --- old/librdkafka-1.8.0/src/rdkafka_conf.h 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_conf.h 2021-10-11 22:15:44.000000000 +0200 @@ -159,7 +159,7 @@ /* Increase in steps of 64 as needed. * This must be larger than sizeof(rd_kafka_[topic_]conf_t) */ -#define RD_KAFKA_CONF_PROPS_IDX_MAX (64*28) +#define RD_KAFKA_CONF_PROPS_IDX_MAX (64*29) /** * @struct rd_kafka_anyconf_t @@ -238,6 +238,7 @@ char *cert_pem; rd_kafka_cert_t *cert; char *ca_location; + char *ca_pem; rd_kafka_cert_t *ca; /** CSV list of Windows certificate stores */ char *ca_cert_stores; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_idempotence.c new/librdkafka-1.8.2/src/rdkafka_idempotence.c --- old/librdkafka-1.8.0/src/rdkafka_idempotence.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_idempotence.c 2021-10-11 22:15:44.000000000 +0200 @@ -613,7 +613,8 @@ * @locality any * @locks none */ -void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...) { +void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, rd_kafka_resp_err_t err, + const char *fmt, ...) { va_list ap; char buf[256]; @@ -630,6 +631,11 @@ rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP); rd_kafka_wrunlock(rk); + /* Transactions: bumping the epoch requires the current transaction + * to be aborted. */ + if (rd_kafka_is_transactional(rk)) + rd_kafka_txn_set_abortable_error_with_bump(rk, err, "%s", buf); + /* Check right away if the drain could be done. */ rd_kafka_idemp_check_drain_done(rk); } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_idempotence.h new/librdkafka-1.8.2/src/rdkafka_idempotence.h --- old/librdkafka-1.8.0/src/rdkafka_idempotence.h 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_idempotence.h 2021-10-11 22:15:44.000000000 +0200 @@ -74,8 +74,9 @@ const rd_kafka_pid_t pid); void rd_kafka_idemp_pid_fsm (rd_kafka_t *rk); void rd_kafka_idemp_drain_reset (rd_kafka_t *rk, const char *reason); -void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...) - RD_FORMAT(printf, 2, 3); +void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, rd_kafka_resp_err_t err, + const char *fmt, ...) + RD_FORMAT(printf, 3, 4); void rd_kafka_idemp_drain_toppar (rd_kafka_toppar_t *rktp, const char *reason); void rd_kafka_idemp_inflight_toppar_sub (rd_kafka_t *rk, rd_kafka_toppar_t *rktp); diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_mock.c new/librdkafka-1.8.2/src/rdkafka_mock.c --- old/librdkafka-1.8.0/src/rdkafka_mock.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_mock.c 2021-10-11 22:15:44.000000000 +0200 @@ -1608,6 +1608,7 @@ const rd_kafka_resp_err_t *errors) { rd_kafka_mock_error_stack_t *errstack; size_t totcnt; + size_t i; mtx_lock(&mcluster->lock); @@ -1622,8 +1623,8 @@ sizeof(*errstack->errs)); } - while (cnt > 0) { - errstack->errs[errstack->cnt].err = errors[--cnt]; + for (i = 0 ; i < cnt ; i++) { + errstack->errs[errstack->cnt].err = errors[i]; errstack->errs[errstack->cnt++].rtt = 0; } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_request.c new/librdkafka-1.8.2/src/rdkafka_request.c --- old/librdkafka-1.8.0/src/rdkafka_request.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_request.c 2021-10-11 22:15:44.000000000 +0200 @@ -2660,7 +2660,7 @@ perr->update_next_err = rd_true; rd_kafka_idemp_drain_epoch_bump( - rk, "skipped sequence numbers"); + rk, perr->err, "skipped sequence numbers"); } else { /* Request's sequence is less than next ack, @@ -2763,7 +2763,7 @@ firstmsg->rkm_u.producer.retries); /* Drain outstanding requests and bump epoch. */ - rd_kafka_idemp_drain_epoch_bump(rk, + rd_kafka_idemp_drain_epoch_bump(rk, perr->err, "unknown producer id"); rd_kafka_txn_set_abortable_error_with_bump( @@ -2800,7 +2800,7 @@ firstmsg->rkm_u.producer.retries); /* Drain outstanding requests and bump epoch. */ - rd_kafka_idemp_drain_epoch_bump(rk, + rd_kafka_idemp_drain_epoch_bump(rk, perr->err, "unknown producer id"); perr->incr_retry = 0; @@ -3169,7 +3169,7 @@ /* Drain outstanding requests and bump the epoch .*/ rd_kafka_idemp_drain_epoch_bump( - rk, "message sequence gap"); + rk, perr->err, "message sequence gap"); } perr->update_next_ack = rd_false; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_ssl.c new/librdkafka-1.8.2/src/rdkafka_ssl.c --- old/librdkafka-1.8.0/src/rdkafka_ssl.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_ssl.c 2021-10-11 22:15:44.000000000 +0200 @@ -955,6 +955,7 @@ */ static int rd_kafka_ssl_set_certs (rd_kafka_t *rk, SSL_CTX *ctx, char *errstr, size_t errstr_size) { + rd_bool_t ca_probe = rd_true; rd_bool_t check_pkey = rd_false; int r; @@ -972,31 +973,74 @@ /* OpenSSL takes ownership of the store */ rk->rk_conf.ssl.ca->store = NULL; - } else if (rk->rk_conf.ssl.ca_location && - strcmp(rk->rk_conf.ssl.ca_location, "probe")) { - /* CA certificate location, either file or directory. */ - int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location); + ca_probe = rd_false; - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading CA certificate(s) from %s %s", - is_dir ? "directory" : "file", - rk->rk_conf.ssl.ca_location); - - r = SSL_CTX_load_verify_locations(ctx, - !is_dir ? - rk->rk_conf.ssl. - ca_location : NULL, - is_dir ? - rk->rk_conf.ssl. - ca_location : NULL); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.ca.location failed: "); - return -1; + } else { + + if (rk->rk_conf.ssl.ca_location && + strcmp(rk->rk_conf.ssl.ca_location, "probe")) { + /* CA certificate location, either file or directory. */ + int is_dir = rd_kafka_path_is_dir( + rk->rk_conf.ssl.ca_location); + + rd_kafka_dbg(rk, SECURITY, "SSL", + "Loading CA certificate(s) from %s %s", + is_dir ? "directory" : "file", + rk->rk_conf.ssl.ca_location); + + r = SSL_CTX_load_verify_locations(ctx, + !is_dir ? + rk->rk_conf.ssl. + ca_location : NULL, + is_dir ? + rk->rk_conf.ssl. + ca_location : NULL); + + if (r != 1) { + rd_snprintf(errstr, errstr_size, + "ssl.ca.location failed: "); + return -1; + } + + ca_probe = rd_false; } - } else { + if (rk->rk_conf.ssl.ca_pem) { + /* CA as PEM string */ + X509 *x509; + X509_STORE *store; + + /* Get the OpenSSL trust store */ + store = SSL_CTX_get_cert_store(ctx); + rd_assert(store != NULL); + + rd_kafka_dbg(rk, SECURITY, "SSL", + "Loading CA certificate from string"); + + x509 = rd_kafka_ssl_X509_from_string( + rk, rk->rk_conf.ssl.ca_pem); + if (!x509) { + rd_snprintf(errstr, errstr_size, + "ssl.ca.pem failed: " + "not in PEM format?: "); + return -1; + } + + if (!X509_STORE_add_cert(store, x509)) { + rd_snprintf(errstr, errstr_size, + "failed to add ssl.ca.pem to " + "CA cert store: "); + X509_free(x509); + return -1; + } + + X509_free(x509); + + ca_probe = rd_false; + } + } + + if (ca_probe) { #ifdef _WIN32 /* Attempt to load CA root certificates from the * configured Windows certificate stores. */ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_timer.c new/librdkafka-1.8.2/src/rdkafka_timer.c --- old/librdkafka-1.8.0/src/rdkafka_timer.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_timer.c 2021-10-11 22:15:44.000000000 +0200 @@ -180,7 +180,10 @@ rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/); - rtmr->rtmr_interval = interval; + /* Make sure the timer interval is non-zero or the timer + * won't be scheduled, which is not what the caller of .._start*() + * would expect. */ + rtmr->rtmr_interval = interval == 0 ? 1 : interval; rtmr->rtmr_callback = callback; rtmr->rtmr_arg = arg; rtmr->rtmr_oneshot = oneshot; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src/rdkafka_txnmgr.c new/librdkafka-1.8.2/src/rdkafka_txnmgr.c --- old/librdkafka-1.8.0/src/rdkafka_txnmgr.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src/rdkafka_txnmgr.c 2021-10-11 22:15:44.000000000 +0200 @@ -45,7 +45,7 @@ static void rd_kafka_txn_curr_api_reply_error (rd_kafka_q_t *rkq, rd_kafka_error_t *error); -static void rd_kafka_txn_coord_timer_restart (rd_kafka_t *rk, int timeout_ms); +static void rd_kafka_txn_coord_timer_start (rd_kafka_t *rk, int timeout_ms); /** @@ -1883,9 +1883,10 @@ err = rd_kafka_txn_normalize_err(err); rd_kafka_dbg(rk, EOS, "ADDOFFSETS", - "AddOffsetsToTxn response from %s: %s (actions 0x%x)", + "AddOffsetsToTxn response from %s: %s (%s)", rkb ? rd_kafka_broker_name(rkb) : "(none)", - rd_kafka_err2name(err), actions); + rd_kafka_err2name(err), + rd_kafka_actions2str(actions)); /* All unhandled errors are considered permanent */ if (err && !actions) @@ -1896,22 +1897,28 @@ "Failed to add offsets to " "transaction: %s", rd_kafka_err2str(err)); + } else { + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) + rd_kafka_txn_coord_timer_start(rk, 50); - } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - rd_rkb_dbg(rkb, EOS, "ADDOFFSETS", - "Failed to add offsets to transaction on broker %s: " - "%s (after %dms): error is retriable", - rd_kafka_broker_name(rkb), - rd_kafka_err2str(err), - (int)(request->rkbuf_ts_sent/1000)); - - if (!rd_timeout_expired(remains_ms) && - rd_kafka_buf_retry(rk->rk_eos.txn_coord, request)) { - rk->rk_eos.txn_req_cnt++; - return; - } - /* Propagate as retriable error through api_reply() below */ + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + rd_rkb_dbg(rkb, EOS, "ADDOFFSETS", + "Failed to add offsets to transaction on " + "broker %s: %s (after %dms): " + "error is retriable", + rd_kafka_broker_name(rkb), + rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent/1000)); + + if (!rd_timeout_expired(remains_ms) && + rd_kafka_buf_retry(rk->rk_eos.txn_coord, request)) { + rk->rk_eos.txn_req_cnt++; + return; + } + /* Propagate as retriable error through + * api_reply() below */ + } } if (err) @@ -2287,7 +2294,7 @@ rd_kafka_err2str(err)); } else { if (actions & RD_KAFKA_ERR_ACTION_REFRESH) - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 50); if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) rd_kafka_txn_set_abortable_error(rk, err, @@ -2915,15 +2922,17 @@ } /** - * @brief (Re-)Start coord query timer + * @brief Start coord query timer if not already started. * * @locality rdkafka main thread * @locks none */ -static void rd_kafka_txn_coord_timer_restart (rd_kafka_t *rk, int timeout_ms) { +static void rd_kafka_txn_coord_timer_start (rd_kafka_t *rk, int timeout_ms) { rd_assert(rd_kafka_is_transactional(rk)); rd_kafka_timer_start_oneshot(&rk->rk_timers, - &rk->rk_eos.txn_coord_tmr, rd_true, + &rk->rk_eos.txn_coord_tmr, + /* don't restart if already started */ + rd_false, 1000 * timeout_ms, rd_kafka_txn_coord_timer_cb, rk); } @@ -3079,7 +3088,7 @@ if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) return rd_true; - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 500); return rd_false; } @@ -3106,7 +3115,7 @@ if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) return rd_true; /* Fatal error */ - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 500); return rd_false; } @@ -3140,7 +3149,7 @@ if (!rkb) { rd_kafka_dbg(rk, EOS, "TXNCOORD", "%s", buf); /* Keep querying for the coordinator */ - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 500); } return rd_false; } @@ -3165,7 +3174,7 @@ if (!rkb) { /* Lost the current coordinator, query for new coordinator */ - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 500); } else { /* Trigger PID state machine */ rd_kafka_idemp_pid_fsm(rk); @@ -3197,7 +3206,7 @@ /* Coordinator is down, the connection will be re-established * automatically, but we also trigger a coordinator query * to pick up on coordinator change. */ - rd_kafka_txn_coord_timer_restart(rk, 500); + rd_kafka_txn_coord_timer_start(rk, 500); } else { /* Coordinator is up. */ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/src-cpp/rdkafkacpp.h new/librdkafka-1.8.2/src-cpp/rdkafkacpp.h --- old/librdkafka-1.8.0/src-cpp/rdkafkacpp.h 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/src-cpp/rdkafkacpp.h 2021-10-11 22:15:44.000000000 +0200 @@ -111,7 +111,7 @@ * @remark This value should only be used during compile time, * for runtime checks of version use RdKafka::version() */ -#define RD_KAFKA_VERSION 0x010800ff +#define RD_KAFKA_VERSION 0x010802ff /** * @brief Returns the librdkafka version as integer. @@ -1305,6 +1305,9 @@ * * @remark Private and public keys in PEM format may also be set with the * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. + * + * @remark CA certificate in PEM format may also be set with the + * `ssl.ca.pem` configuration property. */ virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type, RdKafka::CertificateEncoding cert_enc, @@ -1419,14 +1422,14 @@ */ virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0; - /** + /** * @brief Set callback_data for ssl engine. * - * @remark The \c ssl.engine.location configuration must be set for this + * @remark The \c ssl.engine.location configuration must be set for this * to have affect. * - * @remark The memory pointed to by \p value must remain valid for the - * lifetime of the configuration object and any Kafka clients that + * @remark The memory pointed to by \p value must remain valid for the + * lifetime of the configuration object and any Kafka clients that * use it. * * @returns CONF_OK on success, else CONF_INVALID. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/tests/0004-conf.c new/librdkafka-1.8.2/tests/0004-conf.c --- old/librdkafka-1.8.0/tests/0004-conf.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/tests/0004-conf.c 2021-10-11 22:15:44.000000000 +0200 @@ -607,6 +607,26 @@ rd_kafka_conf_destroy(conf); } +#if WITH_SSL + { + TEST_SAY("Verifying that ssl.ca.location is not " + "overwritten (#3566)\n"); + + conf = rd_kafka_conf_new(); + + test_conf_set(conf, "security.protocol", "SSL"); + test_conf_set(conf, "ssl.ca.location", "/?/does/!/not/exist!"); + + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errstr, sizeof(errstr)); + TEST_ASSERT(!rk, + "Expected rd_kafka_new() to fail with " + "invalid ssl.ca.location"); + TEST_SAY("rd_kafka_new() failed as expected: %s\n", + errstr); + } +#endif + /* Canonical int values, aliases, s2i-verified strings, doubles */ { static const struct { diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/tests/0097-ssl_verify.cpp new/librdkafka-1.8.2/tests/0097-ssl_verify.cpp --- old/librdkafka-1.8.0/tests/0097-ssl_verify.cpp 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/tests/0097-ssl_verify.cpp 2021-10-11 22:15:44.000000000 +0200 @@ -132,7 +132,7 @@ std::string errstr; if (conf->set(loc_prop, "", errstr) != RdKafka::Conf::CONF_OK) - Test::Fail("Failed to reset " + loc_prop); + Test::Fail("Failed to reset " + loc_prop + ": " + errstr); /* Read file */ std::ifstream ifs(loc.c_str()); @@ -143,7 +143,7 @@ " from disk and changed to in-memory " + pem_prop + "\n"); if (conf->set(pem_prop, pem, errstr) != RdKafka::Conf::CONF_OK) - Test::Fail("Failed to set " + pem_prop); + Test::Fail("Failed to set " + pem_prop + ": " + errstr); } /** @@ -257,7 +257,9 @@ conf_location_to_setter(conf, "ssl.certificate.location", RdKafka::CERT_PUBLIC_KEY, pub_enc); - if (load_ca == USE_SETTER) + if (load_ca == USE_CONF) + conf_location_to_pem(conf, "ssl.ca.location", "ssl.ca.pem"); + else if (load_ca == USE_SETTER) conf_location_to_setter(conf, "ssl.ca.location", RdKafka::CERT_CA, ca_enc); @@ -376,8 +378,8 @@ return 0; } - do_test_bad_calls(); + do_test_bad_calls(); do_test_verify(__LINE__, true, USE_LOCATION, RdKafka::CERT_ENC_PEM, @@ -394,6 +396,10 @@ USE_CONF, RdKafka::CERT_ENC_PEM, USE_LOCATION, RdKafka::CERT_ENC_PEM); do_test_verify(__LINE__, true, + USE_CONF, RdKafka::CERT_ENC_PEM, + USE_CONF, RdKafka::CERT_ENC_PEM, + USE_CONF, RdKafka::CERT_ENC_PEM); + do_test_verify(__LINE__, true, USE_SETTER, RdKafka::CERT_ENC_PEM, USE_SETTER, RdKafka::CERT_ENC_PEM, USE_SETTER, RdKafka::CERT_ENC_PKCS12); @@ -408,4 +414,42 @@ return 0; } + + + int main_0097_ssl_verify_local (int argc, char **argv) { + if (!test_check_builtin("ssl")) { + Test::Skip("Test requires SSL support\n"); + return 0; + } + + + /* Check that creating a client with an invalid PEM string fails. */ + const std::string props[] = { "ssl.ca.pem", "ssl.key.pem", + "ssl.certificate.pem", "" }; + + for (int i = 0 ; props[i] != "" ; i++) { + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + + std::string errstr; + + if (conf->set("security.protocol", "SSL", errstr)) + Test::Fail(errstr); + conf->set("debug", "security", errstr); + if (conf->set(props[i], "this is \n not a \t PEM!", errstr)) + Test::Fail("Setting " + props[i] + " to junk should work, " + "expecting failure on client creation"); + + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + delete conf; + if (producer) + Test::Fail("Expected producer creation to fail with " + props[i] + + " set to junk"); + else + Test::Say("Failed to create producer with junk " + props[i] + + " (as expected): " + errstr + "\n"); + } + + return 0; + } + } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/tests/0105-transactions_mock.c new/librdkafka-1.8.2/tests/0105-transactions_mock.c --- old/librdkafka-1.8.0/tests/0105-transactions_mock.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/tests/0105-transactions_mock.c 2021-10-11 22:15:44.000000000 +0200 @@ -1406,7 +1406,6 @@ /** * @brief Switch coordinator during a transaction. * - * @remark Currently fails due to insufficient coord switch handling. */ static void do_test_txn_switch_coordinator (void) { rd_kafka_t *rk; @@ -1476,6 +1475,68 @@ /** + * @brief Switch coordinator during a transaction when AddOffsetsToTxn + * are sent. #3571. + */ +static void do_test_txn_switch_coordinator_refresh (void) { + rd_kafka_t *rk; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = "test"; + const char *transactional_id = "txnid"; + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_consumer_group_metadata_t *cgmetadata; + + SUB_TEST("Test switching coordinators (refresh)"); + + rk = create_txn_producer(&mcluster, transactional_id, 3, NULL); + + rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, + 1); + + /* Start transactioning */ + TEST_SAY("Starting transaction\n"); + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); + + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + + /* Switch the coordinator so that AddOffsetsToTxnRequest + * will respond with NOT_COORDINATOR. */ + TEST_SAY("Switching to coordinator 2\n"); + rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, + 2); + + /* + * Send some arbitrary offsets. + */ + offsets = rd_kafka_topic_partition_list_new(4); + rd_kafka_topic_partition_list_add(offsets, "srctopic", + 3)->offset = 12; + rd_kafka_topic_partition_list_add(offsets, "srctop2", + 99)->offset = 99999; + + cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); + + TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction( + rk, offsets, + cgmetadata, 20*1000)); + + rd_kafka_consumer_group_metadata_destroy(cgmetadata); + rd_kafka_topic_partition_list_destroy(offsets); + + + /* Produce some messages */ + test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0); + + /* And commit the transaction */ + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + + +/** * @brief Test fatal error handling when transactions are not supported * by the broker. */ @@ -2557,6 +2618,141 @@ SUB_TEST_PASS(); } + +/** + * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump + * during an ongoing transaction. + * The transaction should instead enter the abortable state. + */ +static void do_test_out_of_order_seq (void) { + rd_kafka_t *rk; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_error_t *error; + int32_t txn_coord = 1, leader = 2; + const char *txnid = "myTxnId"; + test_timing_t timing; + rd_kafka_resp_err_t err; + + SUB_TEST_QUICK(); + + rk = create_txn_producer(&mcluster, txnid, 3, + "batch.num.messages", "1", + NULL); + + rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, + txn_coord); + + rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader); + + test_curr->ignore_dr_err = rd_true; + test_curr->is_fatal_cb = NULL; + + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); + + /* + * Start a transaction + */ + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + + + + /* Produce one seeding message first to get the leader up and running */ + TEST_CALL_ERR__(rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END)); + test_flush(rk, -1); + + /* Let partition leader have a latency of 2 seconds + * so that we can have multiple messages in-flight. */ + rd_kafka_mock_broker_set_rtt(mcluster, leader, 2*1000); + + /* Produce a message, let it fail with with different errors, + * ending with OUT_OF_ORDER which previously triggered an + * Epoch bump. */ + rd_kafka_mock_push_request_errors( + mcluster, + RD_KAFKAP_Produce, + 3, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER); + + /* Produce three messages that will be delayed + * and have errors injected.*/ + TEST_CALL_ERR__(rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END)); + TEST_CALL_ERR__(rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END)); + TEST_CALL_ERR__(rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END)); + + /* Now sleep a short while so that the messages are processed + * by the broker and errors are returned. */ + TEST_SAY("Sleeping..\n"); + rd_sleep(5); + + rd_kafka_mock_broker_set_rtt(mcluster, leader, 0); + + /* Produce a fifth message, should fail with ERR__STATE since + * the transaction should have entered the abortable state. */ + err = rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE, + "Expected produce() to fail with ERR__STATE, not %s", + rd_kafka_err2name(err)); + TEST_SAY("produce() failed as expected: %s\n", + rd_kafka_err2str(err)); + + /* Commit the transaction, should fail with abortable error. */ + TIMING_START(&timing, "commit_transaction(-1)"); + error = rd_kafka_commit_transaction(rk, -1); + TIMING_STOP(&timing); + TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); + + TEST_SAY("commit_transaction() failed (expectedly): %s\n", + rd_kafka_error_string(error)); + + TEST_ASSERT(!rd_kafka_error_is_fatal(error), + "Did not expect fatal error"); + TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), + "Expected abortable error"); + rd_kafka_error_destroy(error); + + /* Abort the transaction */ + TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); + + /* Run a new transaction without errors to verify that the + * producer can recover. */ + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + + TEST_CALL_ERR__(rd_kafka_producev(rk, + RD_KAFKA_V_TOPIC("mytopic"), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_END)); + + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + + int main_0105_transactions_mock (int argc, char **argv) { if (test_needs_auth()) { TEST_SKIP("Mock cluster does not support SSL/SASL\n"); @@ -2623,5 +2819,9 @@ do_test_txn_switch_coordinator(); + do_test_txn_switch_coordinator_refresh(); + + do_test_out_of_order_seq(); + return 0; } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/librdkafka-1.8.0/tests/test.c new/librdkafka-1.8.2/tests/test.c --- old/librdkafka-1.8.0/tests/test.c 2021-09-15 21:46:12.000000000 +0200 +++ new/librdkafka-1.8.2/tests/test.c 2021-10-11 22:15:44.000000000 +0200 @@ -208,6 +208,7 @@ _TEST_DECL(0094_idempotence_msg_timeout); _TEST_DECL(0095_all_brokers_down); _TEST_DECL(0097_ssl_verify); +_TEST_DECL(0097_ssl_verify_local); _TEST_DECL(0098_consumer_txn); _TEST_DECL(0099_commit_metadata); _TEST_DECL(0100_thread_interceptors); @@ -409,6 +410,7 @@ #endif _TEST(0095_all_brokers_down, TEST_F_LOCAL), _TEST(0097_ssl_verify, 0), + _TEST(0097_ssl_verify_local, TEST_F_LOCAL), _TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)), _TEST(0099_commit_metadata, 0), _TEST(0100_thread_interceptors, TEST_F_LOCAL),