corruptor.py with syntax coloring

This HTML file was generated with Kalle's syntaxcolor.py


   1"""
   2Corrupts random bytes in a file.
   3By Kalle (http://qalle.net)
   4"""
   5
   6import sys
   7import os.path
   8import random
   9
  10SUFFIXES = {
  11    "k": 2 ** 10,
  12    "M": 2 ** 20,
  13    "G": 2 ** 30,
  14    "T": 2 ** 40,
  15}
  16
  17# buffer size in bytes when copying data
  18FILE_BUFFER_SIZE = 2 ** 20
  19
  20HELP_TEXT = """\
  21Corrupts random bytes in a file. No byte is changed more than once.
  22
  23WARNING: running corrupt executables may damage your computer. Only run them
  24under a sandbox/emulator where they have limited permissions.
  25
  26Args: SourceFile TargetFile Method NumberOfBytes [StartAddress [Length]]
  27    SourceFile
  28        Name of file to read.
  29    TargetFile
  30        Name of file to (over)write.
  31    Method
  32        How to change the value of each byte (argument is case insensitive):
  33            A     add one (0xFF overflows to 0x00)
  34            S     subtract one (0x00 underflows to 0xFF)
  35            X     flip all bits (XOR with 0xFF)
  36            Xn    XOR with n (n = hexadecimal number 01...FF)
  37            B     flip one randomly selected bit
  38            Bn    flip n randomly selected bits (n = 1...8)
  39            R     randomize (replace with any value but the original)
  40    NumberOfBytes
  41        How many bytes to corrupt.
  42    StartAddress
  43        Start address of chunk to corrupt (default = 0):
  44            Zero or greater: relative to start of file (0 = first byte).
  45            Negative: relative to end of file (-1 = last byte).
  46    Length
  47        Length of chunk to corrupt (default = 0):
  48            Positive: absolute length.
  49            Zero or negative: up to end of file minus |Length| bytes.
  50
  51Note: the following suffixes may be used with NumberOfBytes, StartAddress and
  52Length (case sensitive):
  53    k = kilo (2**10)
  54    M = mega (2**20)
  55    G = giga (2**30)
  56    T = tera (2**40)\
  57"""
  58
  59def decode_method(arg):
  60    """
  61    Validate&decode Method argument to tuple (mainType, subType).
  62        mainType: one of "A", "S", "X", "B", "R".
  63        subType: int or None.
  64    E.g. x80 -> (X, 0x80).
  65    Raise ValueError on error.
  66    """
  67
  68    if arg == "":
  69        raise ValueError
  70
  71    (mainType, subTypeStr) = (arg[0].upper(), arg[1:])
  72
  73    # validate&convert subTypeStr
  74    if mainType == "X" and subTypeStr == "":
  75        subType = 0xff
  76    elif mainType == "X":
  77        subType = int(subTypeStr, 16)
  78        if not 0x01 <= subType <= 0xff:
  79            raise ValueError
  80    elif mainType == "B" and subTypeStr == "":
  81        subType = 1
  82    elif mainType == "B":
  83        subType = int(subTypeStr)
  84        if not 1 <= subType <= 8:
  85            raise ValueError
  86    elif mainType in ("A", "S", "R") and subTypeStr == "":
  87        subType = None
  88    else:
  89        raise ValueError
  90
  91    return (mainType, subType)
  92
  93def decode_numeric_arg(arg):
  94    """E.g. -2k -> -2048; raise ValueError on error"""
  95
  96    if arg == "":
  97        raise ValueError
  98
  99    multiplier = SUFFIXES.get(arg[-1], 1)
 100    if multiplier != 1:
 101        arg = arg[:-1]
 102
 103    return int(arg) * multiplier
 104
 105def create_line_format(maxAddr):
 106    """Create format code for printable lines."""
 107
 108    maxDecimalDigits = len(str(maxAddr))
 109    maxHexadecimalDigits = len(format(maxAddr, "x"))
 110
 111    return (
 112        "address {address:" + str(maxDecimalDigits) + "d} "
 113        "(0x{address:0" + str(maxHexadecimalDigits) + "x}): "
 114        "0x{originalValue:02x} -> 0x{newValue:02x}"
 115    )
 116
 117def copy_chunk(sourceHnd, targetHnd, length):
 118    # copy length bytes from source handle to target handle
 119
 120    bytesLeft = length
 121    while bytesLeft > 0:
 122        chunkSize = min(bytesLeft, FILE_BUFFER_SIZE)
 123        chunk = sourceHnd.read(chunkSize)
 124        targetHnd.write(chunk)
 125        bytesLeft -= chunkSize
 126
 127def corrupt_byte(byte, method):
 128    (mainType, subType) = method
 129
 130    if mainType == "A":
 131        return (byte + 1) & 0xff
 132
 133    if mainType == "S":
 134        return (byte - 1) & 0xff
 135
 136    if mainType == "X":
 137        if subType is None:
 138            return byte ^ 0xff
 139        return byte ^ subType
 140
 141    if mainType == "B":
 142        if subType is None:
 143            subType = 1
 144        for exponent in random.sample(range(8), subType):
 145            byte ^= 2 ** exponent
 146        return byte
 147
 148    if mainType == "R":
 149        return random.choice(list(set(range(256)) - set([byte])))
 150
 151    exit("Invalid method.")
 152
 153def corrupt_file(sourceHnd, targetHnd, settings):
 154    startAddress  = settings["startAddress"]
 155    length        = settings["length"]
 156    numberOfBytes = settings["numberOfBytes"]
 157    method        = settings["method"]
 158
 159    # get source file size
 160    size = sourceHnd.seek(0, 2)
 161
 162    # convert start address to absolute
 163    if startAddress < 0:
 164        # startAddress = size - (-startAddress)
 165        startAddress += size
 166    if startAddress < 0:
 167        exit("Error: start address is less than zero.")
 168    if startAddress > size - 1:
 169        exit("Error: start address is greater than file size.")
 170
 171    # convert length to absolute
 172    if length < 1:
 173        # length = size - 1 - (-length) - startAddress + 1
 174        length += size - startAddress
 175    if length < 1:
 176        exit("Error: length is less than one.")
 177    if startAddress + length > size:
 178        exit("Error: end of chunk extends past end of file.")
 179    if numberOfBytes > length:
 180        exit("Error: tried to corrupt more bytes than there are in the chunk.")
 181
 182    # select offsets to corrupt (relative to startAddress)
 183    offsetsToCorrupt = sorted(random.sample(range(length), numberOfBytes))
 184
 185    # create format code for printable lines
 186    lineFormat = create_line_format(startAddress + max(offsetsToCorrupt))
 187
 188    sourceHnd.seek(0)
 189    targetHnd.seek(0)
 190    prevAddress = -1
 191
 192    for offset in offsetsToCorrupt:
 193        address = startAddress + offset
 194
 195        # copy bytes between previous and current corrupt byte
 196        bytesToCopy = address - prevAddress - 1
 197        copy_chunk(sourceHnd, targetHnd, bytesToCopy)
 198        prevAddress = address
 199
 200        # corrupt© byte
 201        byte = ord(sourceHnd.read(1))
 202        newByte = corrupt_byte(byte, method)
 203        print(lineFormat.format(
 204            address = address,
 205            originalValue = byte,
 206            newValue = newByte,
 207        ))
 208        targetHnd.write(bytes([newByte]))
 209
 210    # copy bytes after last corrupt byte
 211    bytesToCopy = size - prevAddress - 1
 212    copy_chunk(sourceHnd, targetHnd, bytesToCopy)
 213
 214    if sourceHnd.tell() != size or targetHnd.tell() != size:
 215        exit("Error: unexpectedly, wrong number of bytes was read or written.")
 216
 217def main():
 218    # exit if invalid number of args
 219    if not 5 <= len(sys.argv) <= 7:
 220        exit(HELP_TEXT)
 221
 222    # read args
 223    (source, target, methodRaw, numberOfBytesStr) = sys.argv[1:5]
 224    startAddressStr = sys.argv[5] if len(sys.argv) >= 6 else "0"
 225    lengthStr = sys.argv[6] if len(sys.argv) >= 7 else "0"
 226
 227    # decode method
 228    try:
 229        method = decode_method(methodRaw)
 230    except ValueError:
 231        exit("Invalid method argument.")
 232
 233    # decode numeric args
 234    try:
 235        numberOfBytes = decode_numeric_arg(numberOfBytesStr)
 236        startAddress = decode_numeric_arg(startAddressStr)
 237        length = decode_numeric_arg(lengthStr)
 238    except ValueError:
 239        exit("Invalid numeric argument.")
 240
 241    if numberOfBytes < 1:
 242        exit("Number of bytes to corrupt must be at least one.")
 243
 244    # print warning if source file is an executable
 245    extension = os.path.splitext(source)[1].lower()
 246    if extension in (".com", ".exe"):
 247        print("Note: see warnings in help concerning corrupt executables.")
 248
 249    settings = {
 250        "startAddress": startAddress,
 251        "length": length,
 252        "numberOfBytes": numberOfBytes,
 253        "method": method,
 254    }
 255
 256    try:
 257        with open(source, "rb") as sourceHnd, open(target, "wb") as targetHnd:
 258            corrupt_file(sourceHnd, targetHnd, settings)
 259    except OSError:
 260        exit("File read/write error.")
 261
 262if __name__ == "__main__":
 263    main()