Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-iptables for openSUSE:Factory checked in at 2023-12-08 22:32:20 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-iptables (Old) and /work/SRC/openSUSE:Factory/.python-iptables.new.25432 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-iptables" Fri Dec 8 22:32:20 2023 rev:8 rq:1131979 version:1.0.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-iptables/python-iptables.changes 2018-06-28 15:13:41.179563489 +0200 +++ /work/SRC/openSUSE:Factory/.python-iptables.new.25432/python-iptables.changes 2023-12-08 22:32:52.153800900 +0100 @@ -1,0 +2,6 @@ +Thu Dec 7 22:57:15 UTC 2023 - Dirk Müller <dmuel...@suse.com> + +- update to 1.0.1: + * https://github.com/ldx/python-iptables/compare/v0.13.0...v1.0.1 + +------------------------------------------------------------------- Old: ---- v0.13.0.tar.gz New: ---- v1.0.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-iptables.spec ++++++ --- /var/tmp/diff_new_pack.xc1sN4/_old 2023-12-08 22:32:52.585816796 +0100 +++ /var/tmp/diff_new_pack.xc1sN4/_new 2023-12-08 22:32:52.589816943 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-iptables # -# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2023 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -12,13 +12,13 @@ # license that conforms to the Open Source Definition (Version 1.9) # published by the Open Source Initiative. -# Please submit bugfixes or comments via http://bugs.opensuse.org/ +# Please submit bugfixes or comments via https://bugs.opensuse.org/ # %{?!python_module:%define python_module() python-%{**} python3-%{**}} Name: python-iptables -Version: 0.13.0 +Version: 1.0.1 Release: 0 Summary: Python bindings for iptables License: Apache-2.0 ++++++ v0.13.0.tar.gz -> v1.0.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/.github/workflows/build.yaml new/python-iptables-1.0.1/.github/workflows/build.yaml --- old/python-iptables-0.13.0/.github/workflows/build.yaml 1970-01-01 01:00:00.000000000 +0100 +++ new/python-iptables-1.0.1/.github/workflows/build.yaml 2023-01-22 21:50:31.000000000 +0100 @@ -0,0 +1,84 @@ +name: Build & Test +on: + push: + branches: + - main + tags: + - "v*" + pull_request: +jobs: + build-and-test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: + - "3.7" + - "3.8" + - "3.9" + - "3.10" + - "3.11" + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Build package + run: | + python -m pip install --upgrade build twine + python -m build + twine check --strict dist/* + - name: Install coveralls + run: sudo pip install coveralls + - name: Run tests + run: sudo PATH=$PATH coverage run setup.py test + + release: + runs-on: ubuntu-latest + if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') + needs: + - build-and-test + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.x" + - name: Build package + run: | + python -m pip install --upgrade build twine + python -m build + twine check --strict dist/* + rm -f dist/*.whl + - name: Publish package + uses: pypa/gh-action-pypi-publish@release/v1 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} + - name: Create GitHub release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: ${{ github.ref }} + draft: false + prerelease: false + - name: Set asset name + run: | + export PKG=$(ls dist/ | grep tar) + set -- $PKG + echo "name=$1" >> $GITHUB_ENV + - name: Upload release asset to GitHub + id: upload-release-asset + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: dist/${{ env.name }} + asset_name: ${{ env.name }} + asset_content_type: application/zip diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/.travis.yml new/python-iptables-1.0.1/.travis.yml --- old/python-iptables-0.13.0/.travis.yml 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/.travis.yml 1970-01-01 01:00:00.000000000 +0100 @@ -1,13 +0,0 @@ -language: python -python: - - "2.6" - - "2.7" - - "3.4" -install: - - python setup.py build - - python setup.py install - - sudo pip install coveralls -script: - - sudo PATH=$PATH coverage run setup.py test -after_success: - coveralls diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/README.md new/python-iptables-1.0.1/README.md --- old/python-iptables-0.13.0/README.md 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/README.md 2023-01-22 21:50:31.000000000 +0100 @@ -135,6 +135,37 @@ Examples ======== +High level abstractions +----------------------- + +``python-iptables`` implements a low-level interface that tries to closely +match the underlying C libraries. The module ``iptc.easy`` improves the +usability of the library by providing a rich set of high-level functions +designed to simplify the interaction with the library, for example: + + >>> import iptc + >>> iptc.easy.dump_table('nat', ipv6=False) + {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []} + >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False) + [{'comment': {'comment': 'DNS traffic to Google'}, + 'counters': (1, 56), + 'dst': '8.8.8.8/32', + 'protocol': 'udp', + 'target': 'ACCEPT', + 'udp': {'dport': '53'}}] + >>> iptc.easy.add_chain('filter', 'TestChain') + True + >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} + >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d) + >>> iptc.easy.dump_chain('filter', 'TestChain') + [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}] + >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True) + + >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto + >>> iptc.easy.add_chain('filter', 'TestChainGoto') + >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}} + >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d) + Rules ----- @@ -546,6 +577,21 @@ The drawback is that Table is a singleton, and if you disable autocommit, it will be disabled for all instances of that Table. +Easy rules with dictionaries +---------------------------- +To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries. + + >>> import iptc + >>> table = iptc.Table(iptc.Table.FILTER) + >>> chain = iptc.Chain(table, "INPUT") + >>> # Create an iptc.Rule object from dictionary + >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} + >>> rule = iptc.easy.encode_iptc_rule(rule_d) + >>> # Obtain a dictionary representation from the iptc.Rule + >>> iptc.easy.decode_iptc_rule(rule) + {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'} + + Known Issues ============ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/doc/examples.rst new/python-iptables-1.0.1/doc/examples.rst --- old/python-iptables-0.13.0/doc/examples.rst 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/doc/examples.rst 2023-01-22 21:50:31.000000000 +0100 @@ -1,6 +1,37 @@ Examples ======== +High level abstractions +----------------------- + +``python-iptables`` implements a low-level interface that tries to closely +match the underlying C libraries. The module ``iptc.easy`` improves the +usability of the library by providing a rich set of high-level functions +designed to simplify the interaction with the library, for example: + + >>> import iptc + >>> iptc.easy.dump_table('nat', ipv6=False) + {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []} + >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False) + [{'comment': {'comment': 'DNS traffic to Google'}, + 'counters': (1, 56), + 'dst': '8.8.8.8/32', + 'protocol': 'udp', + 'target': 'ACCEPT', + 'udp': {'dport': '53'}}] + >>> iptc.easy.add_chain('filter', 'TestChain') + True + >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} + >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d) + >>> iptc.easy.dump_chain('filter', 'TestChain') + [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}] + >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True) + + >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto + >>> iptc.easy.add_chain('filter', 'TestChainGoto') + >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}} + >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d) + Rules ----- @@ -410,6 +441,21 @@ The drawback is that `Table` is a singleton, and if you disable autocommit, it will be disabled for all instances of that `Table`. +Easy rules with dictionaries +---------------------------- +To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries. + + >>> import iptc + >>> table = iptc.Table(iptc.Table.FILTER) + >>> chain = iptc.Chain(table, "INPUT") + >>> # Create an iptc.Rule object from dictionary + >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}} + >>> rule = iptc.easy.encode_iptc_rule(rule_d) + >>> # Obtain a dictionary representation from the iptc.Rule + >>> iptc.easy.decode_iptc_rule(rule) + {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'} + + Known Issues ============ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/__init__.py new/python-iptables-1.0.1/iptc/__init__.py --- old/python-iptables-0.13.0/iptc/__init__.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/iptc/__init__.py 2023-01-22 21:50:31.000000000 +0100 @@ -10,6 +10,7 @@ from iptc.ip4tc import (is_table_available, Table, Chain, Rule, Match, Target, Policy, IPTCError) from iptc.ip6tc import is_table6_available, Table6, Rule6 from iptc.errors import * +import iptc.easy __all__ = [] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/easy.py new/python-iptables-1.0.1/iptc/easy.py --- old/python-iptables-0.13.0/iptc/easy.py 1970-01-01 01:00:00.000000000 +0100 +++ new/python-iptables-1.0.1/iptc/easy.py 2023-01-22 21:50:31.000000000 +0100 @@ -0,0 +1,461 @@ +# -*- coding: utf-8 -*- + +# TODO: +# - Add documentation +# - Add HowToUse examples + +from .ip4tc import Rule, Table, Chain, IPTCError +from .ip6tc import Rule6, Table6 + +_BATCH_MODE = False + +def flush_all(ipv6=False): + """ Flush all available tables """ + for table in get_tables(ipv6): + flush_table(table, ipv6) + +def flush_table(table, ipv6=False, raise_exc=True): + """ Flush a table """ + try: + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.flush() + except Exception as e: + if raise_exc: raise + +def flush_chain(table, chain, ipv6=False, raise_exc=True): + """ Flush a chain in table """ + try: + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_chain.flush() + except Exception as e: + if raise_exc: raise + +def zero_all(table, ipv6=False): + """ Zero all tables """ + for table in get_tables(ipv6): + zero_table(table, ipv6) + +def zero_table(table, ipv6=False): + """ Zero a table """ + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.zero_entries() + +def zero_chain(table, chain, ipv6=False): + """ Zero a chain in table """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_chain.zero_counters() + +def has_chain(table, chain, ipv6=False): + """ Return True if chain exists in table False otherwise """ + return _iptc_gettable(table, ipv6).is_chain(chain) + +def has_rule(table, chain, rule_d, ipv6=False): + """ Return True if rule exists in chain False otherwise """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + return iptc_rule in iptc_chain.rules + +def add_chain(table, chain, ipv6=False, raise_exc=True): + """ Return True if chain was added successfully to a table, raise Exception otherwise """ + try: + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.create_chain(chain) + return True + except Exception as e: + if raise_exc: raise + return False + +def add_rule(table, chain, rule_d, position=0, ipv6=False): + """ Add a rule to a chain in a given position. 0=append, 1=first, n=nth position """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + if position == 0: + # Insert rule in last position -> append + iptc_chain.append_rule(iptc_rule) + elif position > 0: + # Insert rule in given position -> adjusted as iptables CLI + iptc_chain.insert_rule(iptc_rule, position - 1) + elif position < 0: + # Insert rule in given position starting from bottom -> not available in iptables CLI + nof_rules = len(iptc_chain.rules) + _position = position + nof_rules + # Insert at the top if the position has looped over + if _position <= 0: + _position = 0 + iptc_chain.insert_rule(iptc_rule, _position) + +def insert_rule(table, chain, rule_d, ipv6=False): + """ Add a rule to a chain in the 1st position """ + add_rule(table, chain, rule_d, position=1, ipv6=ipv6) + +def delete_chain(table, chain, ipv6=False, flush=False, raise_exc=True): + """ Delete a chain """ + try: + if flush: + flush_chain(table, chain, ipv6, raise_exc) + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.delete_chain(chain) + except Exception as e: + if raise_exc: raise + +def delete_rule(table, chain, rule_d, ipv6=False, raise_exc=True): + """ Delete a rule from a chain """ + try: + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + iptc_chain.delete_rule(iptc_rule) + except Exception as e: + if raise_exc: raise + +def get_tables(ipv6=False): + """ Get all tables """ + iptc_tables = _iptc_gettables(ipv6) + return [t.name for t in iptc_tables] + +def get_chains(table, ipv6=False): + """ Return the existing chains of a table """ + iptc_table = _iptc_gettable(table, ipv6) + return [iptc_chain.name for iptc_chain in iptc_table.chains] + +def get_rule(table, chain, position=0, ipv6=False, raise_exc=True): + """ Get a rule from a chain in a given position. 0=all rules, 1=first, n=nth position """ + try: + if position == 0: + # Return all rules + return dump_chain(table, chain, ipv6) + elif position > 0: + # Return specific rule by position + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = iptc_chain.rules[position - 1] + return decode_iptc_rule(iptc_rule, ipv6) + elif position < 0: + # Return last rule -> not available in iptables CLI + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = iptc_chain.rules[position] + return decode_iptc_rule(iptc_rule, ipv6) + except Exception as e: + if raise_exc: raise + +def replace_rule(table, chain, old_rule_d, new_rule_d, ipv6=False): + """ Replaces an existing rule of a chain """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_old_rule = encode_iptc_rule(old_rule_d, ipv6) + iptc_new_rule = encode_iptc_rule(new_rule_d, ipv6) + iptc_chain.replace_rule(iptc_new_rule, iptc_chain.rules.index(iptc_old_rule)) + +def get_rule_counters(table, chain, rule_d, ipv6=False): + """ Return a tuple with the rule counters (numberOfBytes, numberOfPackets) """ + if not has_rule(table, chain, rule_d, ipv6): + raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d)) + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + iptc_rule_index = iptc_chain.rules.index(iptc_rule) + return iptc_chain.rules[iptc_rule_index].get_counters() + +def get_rule_position(table, chain, rule_d, ipv6=False): + """ Return the position of a rule within a chain """ + if not has_rule(table, chain, rule_d): + raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d)) + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + return iptc_chain.rules.index(iptc_rule) + + +def test_rule(rule_d, ipv6=False): + """ Return True if the rule is a well-formed dictionary, False otherwise """ + try: + encode_iptc_rule(rule_d, ipv6) + return True + except: + return False + +def test_match(name, value, ipv6=False): + """ Return True if the match is valid, False otherwise """ + try: + iptc_rule = Rule6() if ipv6 else Rule() + _iptc_setmatch(iptc_rule, name, value) + return True + except: + return False + +def test_target(name, value, ipv6=False): + """ Return True if the target is valid, False otherwise """ + try: + iptc_rule = Rule6() if ipv6 else Rule() + _iptc_settarget(iptc_rule, {name:value}) + return True + except: + return False + + +def get_policy(table, chain, ipv6=False): + """ Return the default policy of chain in a table """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + return iptc_chain.get_policy().name + +def set_policy(table, chain, policy='ACCEPT', ipv6=False): + """ Set the default policy of chain in a table """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + iptc_chain.set_policy(policy) + + +def dump_all(ipv6=False): + """ Return a dictionary representation of all tables """ + return {table: dump_table(table, ipv6) for table in get_tables(ipv6)} + +def dump_table(table, ipv6=False): + """ Return a dictionary representation of a table """ + return {chain: dump_chain(table, chain, ipv6) for chain in get_chains(table, ipv6)} + +def dump_chain(table, chain, ipv6=False): + """ Return a list with the dictionary representation of the rules of a table """ + iptc_chain = _iptc_getchain(table, chain, ipv6) + return [decode_iptc_rule(iptc_rule, ipv6) for iptc_rule in iptc_chain.rules] + + +def batch_begin(table = None, ipv6=False): + """ Disable autocommit on a table """ + _BATCH_MODE = True + if table: + tables = (table, ) + else: + tables = get_tables(ipv6) + for table in tables: + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.autocommit = False + +def batch_end(table = None, ipv6=False): + """ Enable autocommit on table and commit changes """ + _BATCH_MODE = False + if table: + tables = (table, ) + else: + tables = get_tables(ipv6) + for table in tables: + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.autocommit = True + +def batch_add_chains(table, chains, ipv6=False, flush=True): + """ Add multiple chains to a table """ + iptc_table = _batch_begin_table(table, ipv6) + for chain in chains: + if iptc_table.is_chain(chain): + iptc_chain = Chain(iptc_table, chain) + else: + iptc_chain = iptc_table.create_chain(chain) + if flush: + iptc_chain.flush() + _batch_end_table(table, ipv6) + +def batch_delete_chains(table, chains, ipv6=False): + """ Delete multiple chains of a table """ + iptc_table = _batch_begin_table(table, ipv6) + for chain in chains: + if iptc_table.is_chain(chain): + iptc_chain = Chain(iptc_table, chain) + iptc_chain.flush() + iptc_table.delete_chain(chain) + _batch_end_table(table, ipv6) + +def batch_add_rules(table, batch_rules, ipv6=False): + """ Add multiple rules to a table with format (chain, rule_d, position) """ + iptc_table = _batch_begin_table(table, ipv6) + for (chain, rule_d, position) in batch_rules: + iptc_chain = Chain(iptc_table, chain) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + if position == 0: + # Insert rule in last position -> append + iptc_chain.append_rule(iptc_rule) + elif position > 0: + # Insert rule in given position -> adjusted as iptables CLI + iptc_chain.insert_rule(iptc_rule, position-1) + elif position < 0: + # Insert rule in given position starting from bottom -> not available in iptables CLI + nof_rules = len(iptc_chain.rules) + iptc_chain.insert_rule(iptc_rule, position + nof_rules) + _batch_end_table(table, ipv6) + +def batch_delete_rules(table, batch_rules, ipv6=False, raise_exc=True): + """ Delete multiple rules from table with format (chain, rule_d) """ + try: + iptc_table = _batch_begin_table(table, ipv6) + for (chain, rule_d) in batch_rules: + iptc_chain = Chain(iptc_table, chain) + iptc_rule = encode_iptc_rule(rule_d, ipv6) + iptc_chain.delete_rule(iptc_rule) + _batch_end_table(table, ipv6) + except Exception as e: + if raise_exc: raise + + +def encode_iptc_rule(rule_d, ipv6=False): + """ Return a Rule(6) object from the input dictionary """ + # Sanity check + assert(isinstance(rule_d, dict)) + # Basic rule attributes + rule_attr = ('src', 'dst', 'protocol', 'in-interface', 'out-interface', 'fragment') + iptc_rule = Rule6() if ipv6 else Rule() + # Set default target + rule_d.setdefault('target', '') + # Avoid issues with matches that require basic parameters to be configured first + for name in rule_attr: + if name in rule_d: + setattr(iptc_rule, name.replace('-', '_'), rule_d[name]) + for name, value in rule_d.items(): + try: + if name in rule_attr: + continue + elif name == 'counters': + _iptc_setcounters(iptc_rule, value) + elif name == 'target': + _iptc_settarget(iptc_rule, value) + else: + _iptc_setmatch(iptc_rule, name, value) + except Exception as e: + #print('Ignoring unsupported field <{}:{}>'.format(name, value)) + continue + return iptc_rule + +def decode_iptc_rule(iptc_rule, ipv6=False): + """ Return a dictionary representation of the Rule(6) object + Note: host IP addresses are appended their corresponding CIDR """ + d = {} + if ipv6==False and iptc_rule.src != '0.0.0.0/0.0.0.0': + _ip, _netmask = iptc_rule.src.split('/') + _netmask = _netmask_v4_to_cidr(_netmask) + d['src'] = '{}/{}'.format(_ip, _netmask) + elif ipv6==True and iptc_rule.src != '::/0': + d['src'] = iptc_rule.src + if ipv6==False and iptc_rule.dst != '0.0.0.0/0.0.0.0': + _ip, _netmask = iptc_rule.dst.split('/') + _netmask = _netmask_v4_to_cidr(_netmask) + d['dst'] = '{}/{}'.format(_ip, _netmask) + elif ipv6==True and iptc_rule.dst != '::/0': + d['dst'] = iptc_rule.dst + if iptc_rule.protocol != 'ip': + d['protocol'] = iptc_rule.protocol + if iptc_rule.in_interface is not None: + d['in-interface'] = iptc_rule.in_interface + if iptc_rule.out_interface is not None: + d['out-interface'] = iptc_rule.out_interface + if ipv6 == False and iptc_rule.fragment: + d['fragment'] = iptc_rule.fragment + for m in iptc_rule.matches: + if m.name not in d: + d[m.name] = m.get_all_parameters() + elif isinstance(d[m.name], list): + d[m.name].append(m.get_all_parameters()) + else: + d[m.name] = [d[m.name], m.get_all_parameters()] + if iptc_rule.target and iptc_rule.target.name and len(iptc_rule.target.get_all_parameters()): + name = iptc_rule.target.name.replace('-', '_') + d['target'] = {name:iptc_rule.target.get_all_parameters()} + elif iptc_rule.target and iptc_rule.target.name: + if iptc_rule.target.goto: + d['target'] = {'goto':iptc_rule.target.name} + else: + d['target'] = iptc_rule.target.name + # Get counters + d['counters'] = iptc_rule.counters + # Return a filtered dictionary + return _filter_empty_field(d) + +### INTERNAL FUNCTIONS ### +def _iptc_table_available(table, ipv6=False): + """ Return True if the table is available, False otherwise """ + try: + iptc_table = Table6(table) if ipv6 else Table(table) + return True + except: + return False + +def _iptc_gettables(ipv6=False): + """ Return an updated view of all available iptc_table """ + iptc_cls = Table6 if ipv6 else Table + return [_iptc_gettable(t, ipv6) for t in iptc_cls.ALL if _iptc_table_available(t, ipv6)] + +def _iptc_gettable(table, ipv6=False): + """ Return an updated view of an iptc_table """ + iptc_table = Table6(table) if ipv6 else Table(table) + if _BATCH_MODE is False: + iptc_table.commit() + iptc_table.refresh() + return iptc_table + +def _iptc_getchain(table, chain, ipv6=False, raise_exc=True): + """ Return an iptc_chain of an updated table """ + try: + iptc_table = _iptc_gettable(table, ipv6) + if not iptc_table.is_chain(chain): + raise AttributeError('Table <{}> has no chain <{}>'.format(table, chain)) + return Chain(iptc_table, chain) + except Exception as e: + if raise_exc: raise + +def _iptc_setcounters(iptc_rule, value): + # Value is a tuple (numberOfBytes, numberOfPackets) + iptc_rule.counters = value + +def _iptc_setmatch(iptc_rule, name, value): + # Iterate list/tuple recursively + if isinstance(value, list) or isinstance(value, tuple): + for inner_value in value: + _iptc_setmatch(iptc_rule, name, inner_value) + # Assign dictionary value + elif isinstance(value, dict): + iptc_match = iptc_rule.create_match(name) + [iptc_match.set_parameter(k, v) for k, v in value.items()] + # Assign value directly + else: + iptc_match = iptc_rule.create_match(name) + iptc_match.set_parameter(name, value) + +def _iptc_settarget(iptc_rule, value): + # Target is dictionary - Use only 1st pair key/value + if isinstance(value, dict): + t_name, t_value = next(iter(value.items())) + if t_name == 'goto': + iptc_target = iptc_rule.create_target(t_value, goto=True) + else: + iptc_target = iptc_rule.create_target(t_name) + [iptc_target.set_parameter(k, v) for k, v in t_value.items()] + # Simple target + else: + iptc_target = iptc_rule.create_target(value) + +def _batch_begin_table(table, ipv6=False): + """ Disable autocommit on a table """ + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.autocommit = False + return iptc_table + +def _batch_end_table(table, ipv6=False): + """ Enable autocommit on table and commit changes """ + iptc_table = _iptc_gettable(table, ipv6) + iptc_table.autocommit = True + return iptc_table + +def _filter_empty_field(data_d): + """ + Remove empty lists from dictionary values + Before: {'target': {'CHECKSUM': {'checksum-fill': []}}} + After: {'target': {'CHECKSUM': {'checksum-fill': ''}}} + Before: {'tcp': {'dport': ['22']}}} + After: {'tcp': {'dport': '22'}}} + """ + for k, v in data_d.items(): + if isinstance(v, dict): + data_d[k] = _filter_empty_field(v) + elif isinstance(v, list) and len(v) != 0: + v = [_filter_empty_field(_v) if isinstance(_v, dict) else _v for _v in v ] + if isinstance(v, list) and len(v) == 1: + data_d[k] = v.pop() + elif isinstance(v, list) and len(v) == 0: + data_d[k] = '' + return data_d + +def _netmask_v4_to_cidr(netmask_addr): + # Implement Subnet Mask conversion without dependencies + return sum([bin(int(x)).count('1') for x in netmask_addr.split('.')]) + +### /INTERNAL FUNCTIONS ### diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/ip4tc.py new/python-iptables-1.0.1/iptc/ip4tc.py --- old/python-iptables-0.13.0/iptc/ip4tc.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/iptc/ip4tc.py 2023-01-22 21:50:31.000000000 +0100 @@ -9,7 +9,7 @@ import struct import weakref -from .util import find_library, load_kernel +from .util import find_library, load_kernel, find_libc from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, XTablesError, xtables, xt_align, xt_counters, xt_entry_target, xt_entry_match) @@ -26,7 +26,7 @@ _IFNAMSIZ = 16 -_libc = ct.CDLL("libc.so.6") +_libc = find_libc() _get_errno_loc = _libc.__errno_location _get_errno_loc.restype = ct.POINTER(ct.c_int) _malloc = _libc.malloc @@ -419,6 +419,7 @@ res = shlex.split(buf) res.reverse() inv = False + key = None while len(res) > 0: x = res.pop() if x == '!': @@ -433,7 +434,11 @@ params[key] = [] inv = False continue - params[key].append(x) # This is a parameter value. + # At this point key should be set, unless the output from save is + # not formatted right. Let's be defensive, since some users + # reported that problem. + if key is not None: + params[key].append(x) # This is a parameter value. return params def _update_parameters(self): @@ -574,9 +579,7 @@ def __eq__(self, match): basesz = ct.sizeof(xt_entry_match) - if (self.match.u.match_size == match.match.u.match_size and - self.match.u.user.name == match.match.u.user.name and - self.match.u.user.revision == match.match.u.user.revision and + if (self.name == match.name and self.match_buf[basesz:self.usersize] == match.match_buf[basesz:match.usersize]): return True @@ -1064,9 +1067,6 @@ saddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr)) except socket.error: raise ValueError("invalid address %s" % (addr)) - ina = in_addr() - ina.s_addr = ct.c_uint32(saddr) - self.entry.ip.src = ina if not netm.isdigit(): try: @@ -1080,8 +1080,11 @@ nmask = socket.htonl((2 ** imask - 1) << (32 - imask)) neta = in_addr() neta.s_addr = ct.c_uint32(nmask) - self.entry.ip.smsk = neta + # Apply subnet mask to IP address + ina = in_addr() + ina.s_addr = ct.c_uint32(saddr & nmask) + self.entry.ip.src = ina src = property(get_src, set_src) """This is the source network address with an optional network mask in @@ -1125,9 +1128,6 @@ daddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr)) except socket.error: raise ValueError("invalid address %s" % (addr)) - ina = in_addr() - ina.s_addr = ct.c_uint32(daddr) - self.entry.ip.dst = ina if not netm.isdigit(): try: @@ -1142,6 +1142,10 @@ neta = in_addr() neta.s_addr = ct.c_uint32(nmask) self.entry.ip.dmsk = neta + # Apply subnet mask to IP address + ina = in_addr() + ina.s_addr = ct.c_uint32(daddr & nmask) + self.entry.ip.dst = ina dst = property(get_dst, set_dst) """This is the destination network address with an optional network mask @@ -1282,6 +1286,15 @@ counters = self.entry.counters return counters.pcnt, counters.bcnt + def set_counters(self, counters): + """This method set a tuple pair of the packet and byte counters of + the rule.""" + self.entry.counters.pcnt = counters[0] + self.entry.counters.bcnt = counters[1] + + counters = property(get_counters, set_counters) + """This is the packet and byte counters of the rule.""" + # override the following three for the IPv6 subclass def _entry_size(self): return xt_align(ct.sizeof(ipt_entry)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/util.py new/python-iptables-1.0.1/iptc/util.py --- old/python-iptables-0.13.0/iptc/util.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/iptc/util.py 2023-01-22 21:50:31.000000000 +0100 @@ -80,12 +80,19 @@ def _find_library(*names): + exts = [] if version_info >= (3, 3): - ext = get_config_var("EXT_SUFFIX") + exts.append(get_config_var("EXT_SUFFIX")) else: - ext = get_config_var('SO') + exts.append(get_config_var('SO')) + + if version_info >= (3, 5): + exts.append('.so') + for name in names: - libnames = [name, "lib" + name, name + ext, "lib" + name + ext] + libnames = [name, "lib" + name] + for ext in exts: + libnames += [name + ext, "lib" + name + ext] libdir = os.environ.get('IPTABLES_LIBDIR', None) if libdir is not None: libdirs = libdir.split(':') @@ -109,3 +116,19 @@ major = int(m.group(1)) return lib, major return None, None + + +def find_libc(): + lib = ctypes.util.find_library('c') + if lib is not None: + return ctypes.CDLL(lib, mode=ctypes.RTLD_GLOBAL) + + libnames = ['libc.so.6', 'libc.so.0', 'libc.so'] + for name in libnames: + try: + lib = ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL) + return lib + except: + pass + + return None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/version.py new/python-iptables-1.0.1/iptc/version.py --- old/python-iptables-0.13.0/iptc/version.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/iptc/version.py 2023-01-22 21:50:31.000000000 +0100 @@ -1,4 +1,4 @@ # -*- coding: utf-8 -*- __pkgname__ = "python-iptables" -__version__ = "0.13.0" +__version__ = "1.0.1" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/iptc/xtables.py new/python-iptables-1.0.1/iptc/xtables.py --- old/python-iptables-0.13.0/iptc/xtables.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/iptc/xtables.py 2023-01-22 21:50:31.000000000 +0100 @@ -6,7 +6,7 @@ import weakref from . import version -from .util import find_library +from .util import find_library, find_libc from .errors import * XT_INV_PROTO = 0x40 # invert the sense of PROTO @@ -792,7 +792,7 @@ ("v12", _xtables_target_v12)] -_libc, _ = find_library("c") +_libc = find_libc() _optind = ct.c_long.in_dll(_libc, "optind") _optarg = ct.c_char_p.in_dll(_libc, "optarg") @@ -805,10 +805,10 @@ _xtables_libdir = os.getenv("XTABLES_LIBDIR") if _xtables_libdir is None: import re - ldconfig_path_regex = re.compile('^(/.*):$') + ldconfig_path_regex = re.compile('^(/.*):($| \(.*\)$)') import subprocess ldconfig = subprocess.Popen( - ('ldconfig', '-N', '-v'), + ('/sbin/ldconfig', '-N', '-v'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) ldconfig_out, ldconfig_err = ldconfig.communicate() @@ -1020,10 +1020,6 @@ if ext is not None: return ext - xtables._xtables_matches.value = ct.c_void_p(None).value - if xtables._xtables_pending_matches: - xtables._xtables_pending_matches.value = ct.c_void_p(None).value - match = xtables._xtables_find_match(name, XTF_TRY_LOAD, None) if not match: self._try_register(name) @@ -1045,10 +1041,6 @@ if ext is not None: return ext - xtables._xtables_targets.value = ct.c_void_p(None).value - if xtables._xtables_pending_targets: - xtables._xtables_pending_targets.value = ct.c_void_p(None).value - target = xtables._xtables_find_target(name, XTF_TRY_LOAD) if not target: self._try_register(name) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/setup.py new/python-iptables-1.0.1/setup.py --- old/python-iptables-0.13.0/setup.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/setup.py 2023-01-22 21:50:31.000000000 +0100 @@ -15,6 +15,8 @@ name=__pkgname__, version=__version__, description="Python bindings for iptables", + long_description="Python bindings for classic iptables", + long_description_content_type="text/x-rst", author="Vilmos Nebehaj", author_email="v.nebe...@gmail.com", url="https://github.com/ldx/python-iptables", @@ -38,10 +40,7 @@ "Topic :: System :: Networking :: Firewalls", "Topic :: System :: Systems Administration", "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.3", - "Programming Language :: Python :: 3.4", "Programming Language :: Python :: Implementation :: CPython", ], license="Apache License, Version 2.0", diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/tests/test_iptc.py new/python-iptables-1.0.1/tests/test_iptc.py --- old/python-iptables-0.13.0/tests/test_iptc.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/tests/test_iptc.py 2023-01-22 21:50:31.000000000 +0100 @@ -574,6 +574,25 @@ self.failUnless(rule in crules) crules.remove(rule) + def test_rule_to_dict(self): + rule = iptc.Rule6() + rule.protocol = "tcp" + rule.src = "::1/128" + target = iptc.Target(rule, "ACCEPT") + rule.target = target + rule_d = iptc.easy.decode_iptc_rule(rule, ipv6=True) + # Remove counters when comparing rules + rule_d.pop('counters', None) + self.assertEqual(rule_d, {"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"}) + + def test_rule_from_dict(self): + rule = iptc.Rule6() + rule.protocol = "tcp" + rule.src = "::1/128" + target = iptc.Target(rule, "ACCEPT") + rule.target = target + rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"}, ipv6=True) + self.assertEqual(rule, rule2) class TestRule(unittest.TestCase): def setUp(self): @@ -603,13 +622,13 @@ def test_rule_address(self): # valid addresses rule = iptc.Rule() - for addr in [("127.0.0.1/255.255.255.0", "127.0.0.1/255.255.255.0"), - ("!127.0.0.1/255.255.255.0", "!127.0.0.1/255.255.255.0"), - ("127.0.0.1/255.255.128.0", "127.0.0.1/255.255.128.0"), - ("127.0.0.1/16", "127.0.0.1/255.255.0.0"), - ("127.0.0.1/24", "127.0.0.1/255.255.255.0"), - ("127.0.0.1/17", "127.0.0.1/255.255.128.0"), - ("!127.0.0.1/17", "!127.0.0.1/255.255.128.0")]: + for addr in [("127.0.0.1/255.255.255.0", "127.0.0.0/255.255.255.0"), + ("!127.0.0.1/255.255.255.0", "!127.0.0.0/255.255.255.0"), + ("127.0.0.1/255.255.128.0", "127.0.0.0/255.255.128.0"), + ("127.0.0.1/16", "127.0.0.0/255.255.0.0"), + ("127.0.0.1/24", "127.0.0.0/255.255.255.0"), + ("127.0.0.1/17", "127.0.0.0/255.255.128.0"), + ("!127.0.0.1/17", "!127.0.0.0/255.255.128.0")]: rule.src = addr[0] self.assertEquals(rule.src, addr[1]) rule.dst = addr[0] @@ -915,6 +934,25 @@ self.table_nat.commit() self.table_nat.refresh() + def test_rule_to_dict(self): + rule = iptc.Rule() + rule.protocol = "tcp" + rule.src = "127.0.0.1/32" + target = iptc.Target(rule, "ACCEPT") + rule.target = target + rule_d = iptc.easy.decode_iptc_rule(rule) + # Remove counters when comparing rules + rule_d.pop('counters', None) + self.assertEqual(rule_d, {"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"}) + + def test_rule_from_dict(self): + rule = iptc.Rule() + rule.protocol = "tcp" + rule.src = "127.0.0.1/32" + target = iptc.Target(rule, "ACCEPT") + rule.target = target + rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"}) + self.assertEqual(rule, rule2) def suite(): suite_table6 = unittest.TestLoader().loadTestsFromTestCase(TestTable6) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/python-iptables-0.13.0/tests/test_matches.py new/python-iptables-1.0.1/tests/test_matches.py --- old/python-iptables-0.13.0/tests/test_matches.py 2018-04-17 02:19:37.000000000 +0200 +++ new/python-iptables-1.0.1/tests/test_matches.py 2023-01-22 21:50:31.000000000 +0100 @@ -19,7 +19,7 @@ match = rule.create_match("udp") for m in rule.matches: - self.failUnless(m == match) + self.assertEqual(m, match) # check that we can change match parameters after creation match.sport = "12345:55555" @@ -29,7 +29,7 @@ m.sport = "12345:55555" m.dport = "!33333" - self.failUnless(m == match) + self.assertEqual(m, match) def test_match_compare(self): m1 = iptc.Match(iptc.Rule(), "udp") @@ -40,28 +40,28 @@ m2.sport = "12345:55555" m2.dport = "!33333" - self.failUnless(m1 == m2) + self.assertEqual(m1, m2) m2.reset() m2.sport = "12345:55555" m2.dport = "33333" - self.failIf(m1 == m2) + self.assertNotEqual(m1, m2) def test_match_parameters(self): m = iptc.Match(iptc.Rule(), "udp") m.sport = "12345:55555" m.dport = "!33333" - self.failUnless(len(m.parameters) == 2) + self.assertEqual(len(m.parameters), 2) for p in m.parameters: - self.failUnless(p == "sport" or p == "dport") + self.assertTrue(p == "sport" or p == "dport") - self.failUnless(m.parameters["sport"] == "12345:55555") - self.failUnless(m.parameters["dport"] == "!33333") + self.assertEqual(m.parameters["sport"], "12345:55555") + self.assertEqual(m.parameters["dport"], "!33333") m.reset() - self.failUnless(len(m.parameters) == 0) + self.assertEqual(len(m.parameters), 0) def test_get_all_parameters(self): m = iptc.Match(iptc.Rule(), "udp") @@ -69,8 +69,8 @@ m.dport = "!33333" params = m.get_all_parameters() - self.assertEquals(set(params['sport']), set(['12345:55555'])) - self.assertEquals(set(params['dport']), set(['!', '33333'])) + self.assertEqual(set(params['sport']), set(['12345:55555'])) + self.assertEqual(set(params['dport']), set(['!', '33333'])) class TestMultiportMatch(unittest.TestCase): @@ -103,14 +103,14 @@ self.chain.insert_rule(self.rule) rule = self.chain.rules[0] match = rule.matches[0] - self.assertEquals(match.dports, '1111,2222') + self.assertEqual(match.dports, '1111,2222') def test_unicode_multiport(self): self.match.dports = u'1111,2222' self.chain.insert_rule(self.rule) rule = self.chain.rules[0] match = rule.matches[0] - self.assertEquals(match.dports, '1111,2222') + self.assertEqual(match.dports, '1111,2222') class TestXTUdpMatch(unittest.TestCase): @@ -135,9 +135,9 @@ "!12345:12346", "0:1234", "! 1234", "!0:12345", "!1234:65535"]: self.match.sport = port - self.assertEquals(self.match.sport, port.replace(" ", "")) + self.assertEqual(self.match.sport, port.replace(" ", "")) self.match.dport = port - self.assertEquals(self.match.dport, port.replace(" ", "")) + self.assertEqual(self.match.dport, port.replace(" ", "")) self.match.reset() for port in ["-1", "asdf", "!asdf"]: try: @@ -188,7 +188,7 @@ def test_mark(self): for mark in ["0x7b", "! 0x7b", "0x7b/0xfffefffe", "!0x7b/0xff00ff00"]: self.match.mark = mark - self.assertEquals(self.match.mark, mark.replace(" ", "")) + self.assertEqual(self.match.mark, mark.replace(" ", "")) self.match.reset() for mark in ["0xffffffffff", "123/0xffffffff1", "!asdf", "1234:1233"]: try: @@ -232,7 +232,7 @@ def test_limit(self): for limit in ["1/sec", "5/min", "3/hour"]: self.match.limit = limit - self.assertEquals(self.match.limit, limit) + self.assertEqual(self.match.limit, limit) self.match.reset() for limit in ["asdf", "123/1", "!1", "!1/second"]: try: @@ -284,7 +284,7 @@ def test_icmpv6(self): self.chain.insert_rule(self.rule) rule = self.chain.rules[0] - self.assertEquals(self.rule, rule) + self.assertEqual(self.rule, rule) class TestCommentMatch(unittest.TestCase): @@ -310,7 +310,7 @@ self.match.reset() self.match.comment = comment self.chain.insert_rule(self.rule) - self.assertEquals(self.match.comment, comment) + self.assertEqual(self.match.comment, comment) class TestIprangeMatch(unittest.TestCase): @@ -389,8 +389,10 @@ self.chain.insert_rule(self.rule) rule = self.chain.rules[0] m = rule.matches[0] - self.assertEquals(m.name, "state") - self.assertEquals(m.state, "RELATED,ESTABLISHED") + self.assertEqual(m.name, "state") + self.assertEqual(m.state, "RELATED,ESTABLISHED") + self.assertEqual(rule.matches[0].name, self.rule.matches[0].name) + self.assertEqual(rule, self.rule) class TestXTConntrackMatch(unittest.TestCase): @@ -425,7 +427,7 @@ rule = self.chain.rules[0] m = rule.matches[0] self.assertTrue(m.name, ["conntrack"]) - self.assertEquals(m.ctstate, "NEW,RELATED") + self.assertEqual(m.ctstate, "NEW,RELATED") class TestHashlimitMatch(unittest.TestCase): @@ -464,11 +466,42 @@ rule = self.chain.rules[0] m = rule.matches[0] self.assertTrue(m.name, ["hashlimit"]) - self.assertEquals(m.hashlimit_name, "foo") - self.assertEquals(m.hashlimit_mode, "srcip") - self.assertEquals(m.hashlimit_upto, "200/sec") - self.assertEquals(m.hashlimit_burst, "5") + self.assertEqual(m.hashlimit_name, "foo") + self.assertEqual(m.hashlimit_mode, "srcip") + self.assertEqual(m.hashlimit_upto, "200/sec") + self.assertEqual(m.hashlimit_burst, "5") +class TestRecentMatch(unittest.TestCase): + def setUp(self): + self.table = 'filter' + self.chain = 'iptc_test_recent' + iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) + iptc.easy.add_chain(self.table, self.chain, ipv6=False, raise_exc=True) + + def tearDown(self): + iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) + + def test_recent(self): + rule_d = { + 'protocol': 'udp', + 'recent': { + 'mask': '255.255.255.255', + 'update': '', + 'seconds': '60', + 'rsource': '', + 'name': 'UDP-PORTSCAN', + }, + 'target': { + 'REJECT':{ + 'reject-with': 'icmp-port-unreachable' + } + } + } + iptc.easy.add_rule(self.table, self.chain, rule_d) + rule2_d = iptc.easy.get_rule(self.table, self.chain, -1) + # Remove counters when comparing rules + rule2_d.pop('counters', None) + self.assertEqual(rule_d, rule2_d) def suite(): suite_match = unittest.TestLoader().loadTestsFromTestCase(TestMatch) @@ -486,6 +519,8 @@ TestXTConntrackMatch) suite_hashlimit = unittest.TestLoader().loadTestsFromTestCase( TestHashlimitMatch) + suite_recent = unittest.TestLoader().loadTestsFromTestCase( + TestRecentMatch) extra_suites = [] if is_table6_available(iptc.Table6.FILTER): extra_suites += unittest.TestLoader().loadTestsFromTestCase( @@ -494,7 +529,7 @@ return unittest.TestSuite([suite_match, suite_udp, suite_mark, suite_limit, suite_mport, suite_comment, suite_iprange, suite_state, suite_conntrack, - suite_hashlimit] + extra_suites) + suite_hashlimit, suite_recent] + extra_suites) def run_tests():