# Copyright (C) Internet Systems Consortium, Inc. ("ISC") # # SPDX-License-Identifier: MPL-2.0 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, you can obtain one at https://mozilla.org/MPL/2.0/. # # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. from functools import total_ordering import glob import os from pathlib import Path import re import subprocess import time from typing import Dict, List, Optional, Tuple, Union from datetime import datetime, timedelta, timezone import dns import dns.tsig import isctest.log import isctest.query import isctest.util DEFAULT_TTL = 300 NEXT_KEY_EVENT_THRESHOLD = 100 def _query(server, qname, qtype, tsig=None): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) if tsig is not None: tsigkey = tsig.split(":") keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0]) query.use_tsig(keyring) try: response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3) except dns.exception.Timeout: isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}") return None return response @total_ordering class KeyTimingMetadata: """ Represent a single timing information for a key. These objects can be easily compared, support addition and subtraction of timedelta objects or integers(value in seconds). A lack of timing metadata in the key (value 0) should be represented with None rather than an instance of this object. """ FORMAT = "%Y%m%d%H%M%S" def __init__(self, timestamp: str): if int(timestamp) <= 0: raise ValueError(f'invalid timing metadata value: "{timestamp}"') self.value = datetime.strptime(timestamp, self.FORMAT).replace( tzinfo=timezone.utc ) def __repr__(self): return self.value.strftime(self.FORMAT) def __str__(self) -> str: return self.value.strftime(self.FORMAT) def __add__(self, other: Union[timedelta, int]): if isinstance(other, int): other = timedelta(seconds=other) result = KeyTimingMetadata.__new__(KeyTimingMetadata) result.value = self.value + other return result def __sub__(self, other: Union[timedelta, int]): if isinstance(other, int): other = timedelta(seconds=other) result = KeyTimingMetadata.__new__(KeyTimingMetadata) result.value = self.value - other return result def __iadd__(self, other: Union[timedelta, int]): if isinstance(other, int): other = timedelta(seconds=other) self.value += other def __isub__(self, other: Union[timedelta, int]): if isinstance(other, int): other = timedelta(seconds=other) self.value -= other def __lt__(self, other: "KeyTimingMetadata"): return self.value < other.value def __eq__(self, other: object): return isinstance(other, KeyTimingMetadata) and self.value == other.value @staticmethod def now() -> "KeyTimingMetadata": result = KeyTimingMetadata.__new__(KeyTimingMetadata) result.value = datetime.now(timezone.utc) return result class KeyProperties: """ Represent the (expected) properties a key should have. """ def __init__( self, name: str, properties: dict, metadata: dict, timing: Dict[str, KeyTimingMetadata], ): self.name = name self.key = None self.properties = properties self.metadata = metadata self.timing = timing def __repr__(self): return self.name def __str__(self) -> str: return self.name @staticmethod def default(with_state=True) -> "KeyProperties": properties = { "expect": True, "private": True, "legacy": False, "role": "csk", "role_full": "key-signing", "dnskey_ttl": 3600, "flags": 257, } metadata = { "Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number, "Length": 256, "Lifetime": 0, "KSK": "yes", "ZSK": "yes", } timing: Dict[str, KeyTimingMetadata] = {} result = KeyProperties( name="DEFAULT", properties=properties, metadata=metadata, timing=timing ) result.name = "DEFAULT" result.key = None if with_state: result.metadata["GoalState"] = "omnipresent" result.metadata["DNSKEYState"] = "rumoured" result.metadata["KRRSIGState"] = "rumoured" result.metadata["ZRRSIGState"] = "rumoured" result.metadata["DSState"] = "hidden" return result def Ipub(self, config): ipub = timedelta(0) if self.key.get_metadata("Predecessor", must_exist=False) != "undefined": # Ipub = Dprp + TTLkey ipub = ( config["dnskey-ttl"] + config["zone-propagation-delay"] + config["publish-safety"] ) self.timing["Active"] = self.timing["Published"] + ipub def IpubC(self, config): if not self.key.is_ksk(): return ttl1 = config["dnskey-ttl"] + config["publish-safety"] ttl2 = timedelta(0) if self.key.get_metadata("Predecessor", must_exist=False) == "undefined": # If this is the first key, we also need to wait until the zone # signatures are omnipresent. Use max-zone-ttl instead of # dnskey-ttl, and no publish-safety (because we are looking at # signatures here, not the public key). ttl2 = config["max-zone-ttl"] # IpubC = DprpC + TTLkey ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2) self.timing["PublishCDS"] = self.timing["Published"] + ipubc if self.metadata["Lifetime"] != 0: self.timing["DeleteCDS"] = ( self.timing["PublishCDS"] + self.metadata["Lifetime"] ) def Iret(self, config): if self.metadata["Lifetime"] == 0: return sign_delay = config["signatures-validity"] - config["signatures-refresh"] safety_interval = config["retire-safety"] iretKSK = timedelta(0) iretZSK = timedelta(0) if self.key.is_ksk(): # Iret = DprpP + TTLds iretKSK = ( config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval ) if self.key.is_zsk(): # Iret = Dsgn + Dprp + TTLsig iretZSK = ( sign_delay + config["zone-propagation-delay"] + config["max-zone-ttl"] + safety_interval ) self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK) def set_expected_keytimes(self, config, offset=None, pregenerated=False): if self.key is None: raise ValueError("KeyProperties must be attached to a Key") if self.properties["legacy"]: return if offset is None: offset = self.properties["offset"] self.timing["Generated"] = self.key.get_timing("Created") self.timing["Published"] = self.timing["Generated"] if pregenerated: self.timing["Published"] = self.key.get_timing("Publish") self.timing["Published"] = self.timing["Published"] + offset self.Ipub(config) # Set Retired timing metadata if key has lifetime. if self.metadata["Lifetime"] != 0: self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"] self.IpubC(config) self.Iret(config) # Key state change times must exist, but since we cannot reliably tell # when named made the actual state change, we don't care what the # value is. Set it to None will verify that the metadata exists, but # without actual checking the value. self.timing["DNSKEYChange"] = None if self.key.is_ksk(): self.timing["DSChange"] = None self.timing["KRRSIGChange"] = None if self.key.is_zsk(): self.timing["ZRRSIGChange"] = None @total_ordering class Key: """ Represent a key from a keyfile. This object keeps track of its origin (keydir + name), can be used to retrieve metadata from the underlying files and supports convenience operations for KASP tests. """ def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None): self.name = name if keydir is None: self.keydir = Path() else: self.keydir = Path(keydir) self.path = str(self.keydir / name) self.privatefile = f"{self.path}.private" self.keyfile = f"{self.path}.key" self.statefile = f"{self.path}.state" self.tag = int(self.name[-5:]) def get_timing( self, metadata: str, must_exist: bool = True ) -> Optional[KeyTimingMetadata]: regex = rf";\s+{metadata}:\s+(\d+).*" with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: match = re.match(regex, line) if match is not None: try: return KeyTimingMetadata(match.group(1)) except ValueError: break if must_exist: raise ValueError( f'timing metadata "{metadata}" for key "{self.name}" invalid' ) return None def get_metadata( self, metadata: str, file=None, comment=False, must_exist=True ) -> str: if file is None: file = self.statefile value = "undefined" regex = rf"{metadata}:\s+(\S+).*" if comment: # The expected metadata is prefixed with a ';'. regex = rf";\s+{metadata}:\s+(\S+).*" with open(file, "r", encoding="utf-8") as fp: for line in fp: match = re.match(regex, line) if match is not None: value = match.group(1) break if must_exist and value == "undefined": raise ValueError( f'metadata "{metadata}" for key "{self.name}" in file "{file}" undefined' ) return value def get_signing_state( self, offline_ksk=False, zsk_missing=False ) -> Tuple[bool, bool]: """ This returns the signing state derived from the key states, KRRSIGState and ZRRSIGState. If 'offline_ksk' is set to True, we determine the signing state from the timing metadata. If 'zsigning' is True, ensure the current time is between the Active and Retired timing metadata. If 'zsk_missing' is set to True, it means the ZSK private key file is missing, and the KSK should take over signing the RRset, and the expected zone signing state (zsigning) is reversed. """ # Fetch key timing metadata. now = KeyTimingMetadata.now() activate = self.get_timing("Activate") assert activate is not None # to silence mypy - its implied by line above inactive = self.get_timing("Inactive", must_exist=False) active = now >= activate retired = inactive is not None and inactive <= now signing = active and not retired # Fetch key state metadata. krrsigstate = self.get_metadata("KRRSIGState", must_exist=False) ksigning = krrsigstate in ["rumoured", "omnipresent"] zrrsigstate = self.get_metadata("ZRRSIGState", must_exist=False) zsigning = zrrsigstate in ["rumoured", "omnipresent"] if ksigning: assert self.is_ksk() if zsigning: assert self.is_zsk() # If the ZSK private key file is missing, revers the zone signing state. if zsk_missing: zsigning = not zsigning # If testing offline KSK, retrieve the signing state from the key timing # metadata. if offline_ksk and signing and self.is_zsk(): assert zsigning if offline_ksk and signing and self.is_ksk(): ksigning = signing return ksigning, zsigning def ttl(self) -> int: with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: if line.startswith(";"): continue return int(line.split()[1]) return 0 def dnskey(self): with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: if "DNSKEY" in line: return line.strip() return "undefined" def is_ksk(self) -> bool: return self.get_metadata("KSK") == "yes" def is_zsk(self) -> bool: return self.get_metadata("ZSK") == "yes" def dnskey_equals(self, value, cdnskey=False): dnskey = value.split() if cdnskey: # fourth element is the rrtype assert dnskey[3] == "CDNSKEY" dnskey[3] = "DNSKEY" dnskey_fromfile = [] rdata = " ".join(dnskey[:7]) with open(self.keyfile, "r", encoding="utf-8") as file: for line in file: if f"{rdata}" in line: dnskey_fromfile = line.split() pubkey_fromfile = "".join(dnskey_fromfile[7:]) pubkey_fromwire = "".join(dnskey[7:]) return pubkey_fromfile == pubkey_fromwire def cds_equals(self, value, alg): cds = value.split() dsfromkey_command = [ os.environ.get("DSFROMKEY"), "-T", str(self.ttl()), "-a", alg, "-C", "-w", str(self.keyfile), ] out = isctest.run.cmd(dsfromkey_command, log_stdout=True) dsfromkey = out.stdout.decode("utf-8").split() rdata_fromfile = " ".join(dsfromkey[:7]) rdata_fromwire = " ".join(cds[:7]) if rdata_fromfile != rdata_fromwire: isctest.log.debug( f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}" ) return False digest_fromfile = "".join(dsfromkey[7:]).lower() digest_fromwire = "".join(cds[7:]).lower() if digest_fromfile != digest_fromwire: isctest.log.debug( f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}" ) return False return digest_fromfile == digest_fromwire def is_metadata_consistent(self, key, metadata, checkval=True): """ If 'key' exists in 'metadata' then it must also exist in the state meta data. Otherwise, it must not exist in the state meta data. If 'checkval' is True, the meta data values must also match. """ if key in metadata: if checkval: value = self.get_metadata(key) if value != f"{metadata[key]}": isctest.log.debug( f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}" ) return value == f"{metadata[key]}" return self.get_metadata(key) != "undefined" value = self.get_metadata(key, must_exist=False) if value != "undefined": isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}") return value == "undefined" def is_timing_consistent(self, key, timing, file, comment=False): """ If 'key' exists in 'timing' then it must match the value in the state timing data. Otherwise, it must also not exist in the state timing data. """ if key in timing: value = self.get_metadata(key, file=file, comment=comment) if value != str(timing[key]): isctest.log.debug( f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}" ) return value == str(timing[key]) value = self.get_metadata(key, file=file, comment=comment, must_exist=False) if value != "undefined": isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}") return value == "undefined" def match_properties(self, zone, properties): """ Check the key with given properties. """ if not properties.properties["expect"]: return False # Check file existence. # Noop. If file is missing then the get_metadata calls will fail. # Check the public key file. role = properties.properties["role_full"] comment = f"This is a {role} key, keyid {self.tag}, for {zone}." if not isctest.util.file_contents_contain(self.keyfile, comment): isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") return False ttl = properties.properties["dnskey_ttl"] flags = properties.properties["flags"] alg = properties.metadata["Algorithm"] dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}" if not isctest.util.file_contents_contain(self.keyfile, dnskey): isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'") return False # Now check the private key file. if properties.properties["private"]: # Retrieve creation date. created = self.get_metadata("Generated") pval = self.get_metadata("Created", file=self.privatefile) if pval != created: isctest.log.debug( f"{self.name} Created METADATA MISMATCH: {pval} - {created}" ) return False pval = self.get_metadata("Private-key-format", file=self.privatefile) if pval != "v1.3": isctest.log.debug( f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3" ) return False pval = self.get_metadata("Algorithm", file=self.privatefile) if pval != f"{alg}": isctest.log.debug( f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}" ) return False # Now check the key state file. if properties.properties["legacy"]: return True comment = f"This is the state of key {self.tag}, for {zone}." if not isctest.util.file_contents_contain(self.statefile, comment): isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") return False attributes = [ "Lifetime", "Algorithm", "Length", "KSK", "ZSK", "GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState", ] for key in attributes: if not self.is_metadata_consistent(key, properties.metadata): return False # A match is found. return True def match_timingmetadata(self, timings, file=None, comment=False): if file is None: file = self.statefile attributes = [ "Generated", "Created", "Published", "Publish", "PublishCDS", "SyncPublish", "Active", "Activate", "Retired", "Inactive", "Revoked", "Removed", "Delete", ] for key in attributes: if not self.is_timing_consistent(key, timings, file, comment=comment): isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}") return False return True def __lt__(self, other: "Key"): return self.name < other.name def __eq__(self, other: object): return isinstance(other, Key) and self.path == other.path def __repr__(self): return self.path def check_zone_is_signed(server, zone, tsig=None): addr = server.ip fqdn = f"{zone}." # wait until zone is fully signed signed = False for _ in range(10): response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig) if not isinstance(response, dns.message.Message): isctest.log.debug(f"no response for {fqdn} NSEC from {addr}") elif response.rcode() != dns.rcode.NOERROR: rcode = dns.rcode.to_text(response.rcode()) isctest.log.debug(f"{rcode} response for {fqdn} NSEC from {addr}") else: has_nsec = False has_rrsig = False for rr in response.answer: if not has_nsec: has_nsec = rr.match( dns.name.from_text(fqdn), dns.rdataclass.IN, dns.rdatatype.NSEC, dns.rdatatype.NONE, ) if not has_rrsig: has_rrsig = rr.match( dns.name.from_text(fqdn), dns.rdataclass.IN, dns.rdatatype.RRSIG, dns.rdatatype.NSEC, ) if not has_nsec: isctest.log.debug( f"missing apex {fqdn} NSEC record in response from {addr}" ) if not has_rrsig: isctest.log.debug( f"missing {fqdn} NSEC signature in response from {addr}" ) signed = has_nsec and has_rrsig if signed: break time.sleep(1) assert signed def check_keys(zone, keys, expected): """ Checks keys for a configured zone. This verifies: 1. The expected number of keys exist in 'keys'. 2. The keys match the expected properties. """ def _verify_keys(): # check number of keys matches expected. if len(keys) != len(expected): return False if len(keys) == 0: return True for expect in expected: expect.key = None for key in keys: found = False i = 0 while not found and i < len(expected): if expected[i].key is None: found = key.match_properties(zone, expected[i]) if found: key.external = expected[i].properties["legacy"] expected[i].key = key i += 1 if not found: return False return True isctest.run.retry_with_timeout(_verify_keys, timeout=10) def check_keytimes(keys, expected): """ Check the key timing metadata for all keys in 'keys'. """ assert len(keys) == len(expected) if len(keys) == 0: return for key in keys: for expect in expected: if expect.properties["legacy"]: continue if not key is expect.key: continue synonyms = {} if "Generated" in expect.timing: synonyms["Created"] = expect.timing["Generated"] if "Published" in expect.timing: synonyms["Publish"] = expect.timing["Published"] if "PublishCDS" in expect.timing: synonyms["SyncPublish"] = expect.timing["PublishCDS"] if "Active" in expect.timing: synonyms["Activate"] = expect.timing["Active"] if "Retired" in expect.timing: synonyms["Inactive"] = expect.timing["Retired"] if "DeleteCDS" in expect.timing: synonyms["SyncDelete"] = expect.timing["DeleteCDS"] if "Revoked" in expect.timing: synonyms["Revoked"] = expect.timing["Revoked"] if "Removed" in expect.timing: synonyms["Delete"] = expect.timing["Removed"] assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True) if expect.properties["private"]: assert key.match_timingmetadata(synonyms, file=key.privatefile) if not expect.properties["legacy"]: assert key.match_timingmetadata(expect.timing) state_changes = [ "DNSKEYChange", "KRRSIGChange", "ZRRSIGChange", "DSChange", ] for change in state_changes: assert key.is_metadata_consistent( change, expect.timing, checkval=False ) def check_keyrelationships(keys, expected): """ Check the key relationships (Successor and Predecessor metadata). """ for key in keys: for expect in expected: if expect.properties["legacy"]: continue if not key is expect.key: continue relationship_status = ["Predecessor", "Successor"] for status in relationship_status: assert key.is_metadata_consistent(status, expect.metadata) def check_dnssec_verify(server, zone, tsig=None): # Check if zone if DNSSEC valid with dnssec-verify. fqdn = f"{zone}." verified = False for _ in range(10): transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig) if not isinstance(transfer, dns.message.Message): isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}") elif transfer.rcode() != dns.rcode.NOERROR: rcode = dns.rcode.to_text(transfer.rcode()) isctest.log.debug(f"{rcode} response for {fqdn} AXFR from {server.ip}") else: zonefile = f"{zone}.axfr" with open(zonefile, "w", encoding="utf-8") as file: for rr in transfer.answer: file.write(rr.to_text()) file.write("\n") try: verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile] verified = isctest.run.cmd(verify_command) except subprocess.CalledProcessError: pass if verified: break time.sleep(1) assert verified def check_dnssecstatus(server, zone, keys, policy=None, view=None): # Call rndc dnssec -status on 'server' for 'zone'. Expect 'policy' in # the output. This is a loose verification, it just tests if the right # policy name is returned, and if all expected keys are listed. response = "" if view is None: response = server.rndc(f"dnssec -status {zone}", log=False) else: response = server.rndc(f"dnssec -status {zone} in {view}", log=False) if policy is None: assert "Zone does not have dnssec-policy" in response return assert f"dnssec-policy: {policy}" in response for key in keys: assert f"key: {key.tag}" in response def _check_signatures( signatures, covers, fqdn, keys, offline_ksk=False, zsk_missing=False ): numsigs = 0 zrrsig = True if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]: zrrsig = False krrsig = not zrrsig for key in keys: ksigning, zsigning = key.get_signing_state( offline_ksk=offline_ksk, zsk_missing=zsk_missing ) alg = key.get_metadata("Algorithm") rtype = dns.rdatatype.to_text(covers) expect = rf"IN RRSIG {rtype} {alg} (\d) (\d+) (\d+) (\d+) {key.tag} {fqdn}" if zrrsig and zsigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: has_rrsig = True break assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 if zrrsig and not zsigning: for rrsig in signatures: assert re.search(expect, rrsig) is None if krrsig and ksigning: has_rrsig = False for rrsig in signatures: if re.search(expect, rrsig) is not None: has_rrsig = True break assert has_rrsig, f"Expected signature but not found: {expect}" numsigs += 1 if krrsig and not ksigning: for rrsig in signatures: assert re.search(expect, rrsig) is None return numsigs def check_signatures( rrset, covers, fqdn, ksks, zsks, offline_ksk=False, zsk_missing=False ): # Check if signatures with covering type are signed with the right keys. # The right keys are the ones that expect a signature and have the # correct role. numsigs = 0 signatures = [] for rr in rrset: for rdata in rr: rdclass = dns.rdataclass.to_text(rr.rdclass) rdtype = dns.rdatatype.to_text(rr.rdtype) rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" signatures.append(rrsig) numsigs += _check_signatures( signatures, covers, fqdn, ksks, offline_ksk=offline_ksk, zsk_missing=zsk_missing ) numsigs += _check_signatures( signatures, covers, fqdn, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing ) assert numsigs == len(signatures) def _check_dnskeys(dnskeys, keys, cdnskey=False): now = KeyTimingMetadata.now() numkeys = 0 publish_md = "Publish" delete_md = "Delete" if cdnskey: publish_md = f"Sync{publish_md}" delete_md = f"Sync{delete_md}" for key in keys: publish = key.get_timing(publish_md, must_exist=False) delete = key.get_timing(delete_md, must_exist=False) published = publish is not None and now >= publish removed = delete is not None and delete <= now if not published or removed: for dnskey in dnskeys: assert not key.dnskey_equals(dnskey, cdnskey=cdnskey) continue has_dnskey = False for dnskey in dnskeys: if key.dnskey_equals(dnskey, cdnskey=cdnskey): has_dnskey = True break if not cdnskey: assert has_dnskey if has_dnskey: numkeys += 1 return numkeys def check_dnskeys(rrset, ksks, zsks, cdnskey=False): # Check if the correct DNSKEY records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY # records instead. numkeys = 0 dnskeys = [] for rr in rrset: for rdata in rr: rdclass = dns.rdataclass.to_text(rr.rdclass) rdtype = dns.rdatatype.to_text(rr.rdtype) dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" dnskeys.append(dnskey) numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey) if not cdnskey: numkeys += _check_dnskeys(dnskeys, zsks) assert numkeys == len(dnskeys) def check_cds(rrset, keys): # Check if the correct CDS records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY # records instead. now = KeyTimingMetadata.now() numcds = 0 cdss = [] for rr in rrset: for rdata in rr: rdclass = dns.rdataclass.to_text(rr.rdclass) rdtype = dns.rdatatype.to_text(rr.rdtype) cds = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" cdss.append(cds) for key in keys: assert key.is_ksk() publish = key.get_timing("SyncPublish") delete = key.get_timing("SyncDelete", must_exist=False) published = now >= publish removed = delete is not None and delete <= now if not published or removed: for cds in cdss: assert not key.cds_equals(cds, "SHA-256") continue has_cds = False for cds in cdss: if key.cds_equals(cds, "SHA-256"): has_cds = True break assert has_cds numcds += 1 assert numcds == len(cdss) def _query_rrset(server, fqdn, qtype, tsig=None): response = _query(server, fqdn, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR rrs = [] rrsigs = [] for rrset in response.answer: if rrset.match( dns.name.from_text(fqdn), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype ): rrsigs.append(rrset) elif rrset.match( dns.name.from_text(fqdn), dns.rdataclass.IN, qtype, dns.rdatatype.NONE ): rrs.append(rrset) else: assert False return rrs, rrsigs def check_apex( server, zone, ksks, zsks, offline_ksk=False, zsk_missing=False, tsig=None ): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." # test dnskey query dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) check_dnskeys(dnskeys, ksks, zsks) check_signatures( rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk ) # test soa query soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() check_signatures( rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks, offline_ksk=offline_ksk, zsk_missing=zsk_missing, ) # test cdnskey query cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 check_signatures( rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, offline_ksk=offline_ksk ) # test cds query cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 check_signatures( rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, offline_ksk=offline_ksk ) def check_subdomain(server, zone, ksks, zsks, offline_ksk=False, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." qtype = dns.rdatatype.A response = _query(server, qname, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1" rrsigs = [] for rrset in response.answer: if rrset.match( dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype ): rrsigs.append(rrset) else: assert match in rrset.to_text() check_signatures(rrsigs, qtype, fqdn, ksks, zsks, offline_ksk=offline_ksk) def verify_update_is_signed(server, fqdn, qname, qtype, rdata, ksks, zsks, tsig=None): """ Test an RRset below the apex and verify it is updated and signed correctly. """ response = _query(server, qname, qtype, tsig=tsig) if response.rcode() != dns.rcode.NOERROR: return False rrtype = dns.rdatatype.to_text(qtype) match = f"{qname} {DEFAULT_TTL} IN {rrtype} {rdata}" rrsigs = [] for rrset in response.answer: if rrset.match( dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype ): rrsigs.append(rrset) elif not match in rrset.to_text(): return False if len(rrsigs) == 0: return False # Zone is updated, ready to verify the signatures. check_signatures(rrsigs, qtype, fqdn, ksks, zsks) return True def verify_rrsig_is_refreshed( server, fqdn, zonefile, qname, qtype, ksks, zsks, tsig=None ): """ Verify signature for RRset has been refreshed. """ response = _query(server, qname, qtype, tsig=tsig) if response.rcode() != dns.rcode.NOERROR: return False rrtype = dns.rdatatype.to_text(qtype) match = f"{qname}. {DEFAULT_TTL} IN {rrtype}" rrsigs = [] for rrset in response.answer: if rrset.match( dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype ): rrsigs.append(rrset) elif not match in rrset.to_text(): return False if len(rrsigs) == 0: return False tmp_zonefile = f"{zonefile}.tmp" isctest.run.cmd( [ os.environ["CHECKZONE"], "-D", "-q", "-o", tmp_zonefile, "-f", "raw", fqdn, zonefile, ], ) zone = dns.zone.from_file(tmp_zonefile, fqdn) for rrsig in rrsigs: if isctest.util.zone_contains(zone, rrsig): return False # Zone is updated, ready to verify the signatures. check_signatures(rrsigs, qtype, fqdn, ksks, zsks) return True def verify_rrsig_is_reused(server, fqdn, zonefile, qname, qtype, ksks, zsks, tsig=None): """ Verify signature for RRset has been reused. """ response = _query(server, qname, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR rrtype = dns.rdatatype.to_text(qtype) match = f"{qname}. {DEFAULT_TTL} IN {rrtype}" rrsigs = [] for rrset in response.answer: if rrset.match( dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype ): rrsigs.append(rrset) else: assert match in rrset.to_text() tmp_zonefile = f"{zonefile}.tmp" isctest.run.cmd( [ os.environ["CHECKZONE"], "-D", "-q", "-o", tmp_zonefile, "-f", "raw", fqdn, zonefile, ], ) zone = dns.zone.from_file(tmp_zonefile, dns.name.from_text(fqdn), relativize=False) for rrsig in rrsigs: assert isctest.util.zone_contains(zone, rrsig) check_signatures(rrsigs, qtype, fqdn, ksks, zsks) def next_key_event_equals(server, zone, next_event): if next_event is None: # No next key event check. return True val = int(next_event.total_seconds()) if val == 3600: waitfor = rf".*zone {zone}.*: next key event in (.*) seconds" else: # Don't want default loadkeys interval. waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" with server.watch_log_from_start() as watcher: watcher.wait_for_line(re.compile(waitfor)) # WMM: The with code below is extracting the line the watcher was # waiting for. If WatchLog.wait_for_line()` returned the matched string, # we can use it directly on `re.match`. next_found = False minval = val - NEXT_KEY_EVENT_THRESHOLD maxval = val + NEXT_KEY_EVENT_THRESHOLD with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp: for line in fp: match = re.match(waitfor, line) if match is not None: nextval = int(match.group(1)) if minval <= nextval <= maxval: next_found = True break isctest.log.debug( f"check next key event: expected {val} in: {line.strip()}" ) return next_found def keydir_to_keylist( zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False ) -> List[Key]: """ Retrieve all keys from the key files in a directory. If 'zone' is None, retrieve all keys in the directory, otherwise only those matching the zone name. If 'keydir' is None, search the current directory. """ if zone is None: zone = "" all_keys = [] if keydir is None: regex = rf"(K{zone}\.\+.*\+.*)\.key" for filename in glob.glob(f"K{zone}.+*+*.key"): match = re.match(regex, filename) if match is not None: all_keys.append(Key(match.group(1))) else: regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key" for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"): match = re.match(regex, filename) if match is not None: all_keys.append(Key(match.group(1), keydir)) states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"] def used(kk): if not in_use: return True for state in states: val = kk.get_metadata(state, must_exist=False) if val not in ["undefined", "hidden"]: isctest.log.debug(f"key {kk} in use") return True return False return [k for k in all_keys if used(k)] def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: return [Key(name, keydir) for name in keystr.split()] def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]: """ Get the policies from a list of specially formatted strings. The splitted line should result in the following items: line[0]: Role line[1]: Lifetime line[2]: Algorithm line[3]: Length Then, optional data for specific tests may follow: - "goal", "dnskey", "krrsig", "zrrsig", "ds", followed by a value, sets the given state to the specific value - "missing", set if the private key file for this key is not available. - "offset", an offset for testing key rollover timings """ proplist = [] count = 0 for key in keys: count += 1 line = key.split() keyprop = KeyProperties(f"KEY{count}", {}, {}, {}) keyprop.properties["expect"] = True keyprop.properties["private"] = True keyprop.properties["legacy"] = False keyprop.properties["offset"] = timedelta(0) keyprop.properties["role"] = line[0] if line[0] == "zsk": keyprop.properties["role_full"] = "zone-signing" keyprop.properties["flags"] = 256 keyprop.metadata["ZSK"] = "yes" keyprop.metadata["KSK"] = "no" else: keyprop.properties["role_full"] = "key-signing" keyprop.properties["flags"] = 257 keyprop.metadata["ZSK"] = "yes" if line[0] == "csk" else "no" keyprop.metadata["KSK"] = "yes" keyprop.properties["dnskey_ttl"] = ttl keyprop.metadata["Algorithm"] = line[2] keyprop.metadata["Length"] = line[3] keyprop.metadata["Lifetime"] = 0 if line[1] != "unlimited": keyprop.metadata["Lifetime"] = int(line[1]) for i in range(4, len(line)): if line[i].startswith("goal:"): keyval = line[i].split(":") keyprop.metadata["GoalState"] = keyval[1] elif line[i].startswith("dnskey:"): keyval = line[i].split(":") keyprop.metadata["DNSKEYState"] = keyval[1] elif line[i].startswith("krrsig:"): keyval = line[i].split(":") keyprop.metadata["KRRSIGState"] = keyval[1] elif line[i].startswith("zrrsig:"): keyval = line[i].split(":") keyprop.metadata["ZRRSIGState"] = keyval[1] elif line[i].startswith("ds:"): keyval = line[i].split(":") keyprop.metadata["DSState"] = keyval[1] elif line[i].startswith("offset:"): keyval = line[i].split(":") keyprop.properties["offset"] = timedelta(seconds=int(keyval[1])) elif line[i] == "missing": keyprop.properties["private"] = False else: assert False, f"undefined optional data {line[i]}" proplist.append(keyprop) return proplist