Skip to content

scapy で CoS や ToS 付きパケットを送信するサンプル

以前に Python3 で scapy を用いてパケットを送信するサンプルスクリプト というメモを書きました。 このスクリプトを拡張し、CoS や ToS (DSCP) 値を付与出来るようにしたサンプルスクリプトをメモしておきます。 テストは Ubuntu 21.04 上の Python 3.9.5 で行いました。

scapy のインストール

Python でのパケット送信には scapy を使います。 まず scapy をインストールしておきます。

1
2
3
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python3 get-pip.py --force-reinstall
python3 -m pip install scapy

サンプルスクリプト

サンプルスクリプトは以下の通りです。 あくまでサンプルなので作り込んでおらず、「VLAN ID を指定しない場合は Src/Dst MAC を指定出来ない」(指定されても無視する) といった実装にしています。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3

import argparse
import sys
from argparse import RawTextHelpFormatter
from scapy.all import *

def get_alphabets(count: int) -> str:
    alphabets = list(range(97, 123))
    alphabets.extend(list(range(65, 91)))
    return get_characters(alphabets, count)


def get_characters(alphabets: list, count: int) -> str:
    result = ''
    index = 0
    for i in range(0, count):
        result += chr(alphabets[index])
        index += 1
        if len(alphabets) <= index:
            index = 0
    return result


if __name__ == '__main__':
    parser = argparse.ArgumentParser(formatter_class=RawTextHelpFormatter)
    parser.add_argument('--fragmentsize', default=1480, type=int,
        help="Fragment size (Default: 1480)")
    parser.add_argument('--id', default=12345, type=int,
        help="ID (Default: 12345)")
    parser.add_argument('--length', default=100, type=int,
        help="Packet length (Default: 100)")
    parser.add_argument('--protocol', default="icmp", type=str,
        help="Protocol (Default: icmp)")
    parser.add_argument('--smac', type=str,
        help="Source MAC (Default: None)")
    parser.add_argument('--dmac', default='ff:ff:ff:ff:ff:ff', type=str,
        help="Destination MAC (Default: ff:ff:ff:ff:ff:ff)")
    parser.add_argument('--sport', default=10001, type=int,
        help="Source port (Default: 10001)")
    parser.add_argument('--dport', default=10002, type=int,
        help="Destination port (Default: 10002)")
    parser.add_argument('--vlan', default=0, type=int,
        help="VLAN ID (Default: 0)")
    parser.add_argument('--cos', default=0, type=int,
        help="CoS (Default: 0)")
    parser.add_argument('--tos', default=0, type=int,
        help="ToS (Default: 0)\n" +
        "\n" +
        "PHB  Binary  Decimal\n" +
        "---- ------- -------\n" +
        "CS0  000 000 (0)\n" +
        "CS1  001 000 (8)\n" +
        "CS2  010 000 (16)\n" +
        "CS3  011 000 (24)\n" +
        "CS4  100 000 (32)\n" +
        "CS5  101 000 (40)\n" +
        "CS6  110 000 (48)\n" +
        "CS7  111 000 (56)\n" +
        "AF11 001 010 (10)\n" +
        "AF12 001 100 (12)\n" +
        "AF13 001 110 (14)\n" +
        "AF21 010 010 (18)\n" +
        "AF22 010 100 (20)\n" +
        "AF23 010 110 (22)\n" +
        "AF31 011 010 (26)\n" +
        "AF32 011 100 (28)\n" +
        "AF33 011 110 (30)\n" +
        "AF41 100 010 (34)\n" +
        "AF42 100 100 (36)\n" +
        "AF43 100 110 (38)\n" +
        "EF   101 110 (46)")
    parser.add_argument('address', type=str,
        help="Destination address")
    args = parser.parse_args()

    payload = get_alphabets(args.length)

    if args.smac == None:
        L2 = Ether(dst=args.dmac)/Dot1Q(prio=args.cos, vlan=args.vlan, type=2048)
    else:
        L2 = Ether(dst=args.dmac, src=args.smac)/Dot1Q(prio=args.cos, vlan=args.vlan, type=2048)

    L3 = IP(dst=args.address, id=args.id, tos=args.tos << 2)

    if args.protocol.lower() == "icmp":
        L4 = ICMP()/payload
    elif args.protocol.lower() == "udp":
        L4 = UDP(sport=args.sport, dport=args.dport)/payload
    elif args.protocol.lower() == "tcp":
        L4 = TCP(sport=args.sport, dport=args.dport)/payload
    else:
        sys.exit(-1)

    fragments = fragment(L3/L4, fragsize=args.fragmentsize)

    count = 0
    for fragment in fragments:
        count += 1
        print("######################################## No." + str(count))
        if args.vlan == 0:
            send(fragment, verbose=False)
            print(ls(fragment))
        else:
            sendp(L2/fragment, verbose=False)
            print(ls(L2))
            print(ls(fragment))
    print("Sent " + str(count) + " " + args.protocol.lower() + " packet(s).")

パケットを送信する

サンプルスクリプトのファイル名が send.py の場合、以下のように実行します。

コマンド例 意味
./send.py 192.168.1.1 --protocol icmp ICMP パケットを送信する
./send.py 192.168.1.1 --protocol icmp --tos 46 DSCP に EF (46) を付与した ICMP パケットを送信する
./send.py 192.168.1.1 --protocol icmp --vlan 100 VLAN100 で ICMP パケットを送信する
./send.py 192.168.1.1 --protocol icmp --vlan 100 --cos 7 VLAN100、CoS 7 を付与した ICMP パケットを送信する

キャプチャする

tcpdump でキャプチャする場合、tcpdump -i ens160 icmp のように実行すると「非 Tagged な ICMP パケットしかキャプチャしない」(Tagged な ICMP パケットはキャプチャしない) という点に注意が必要です。

コマンド例 意味
tcpdump -i ens160 icmp Tag 無し ICMP パケットのみ、キャプチャする
tcpdump -i ens160 vlan and icmp Tag 有り ICMP パケットのみ、キャプチャする
tcpdump -i ens160 '(icmp) or (vlan and icmp)' Tag 有り/無しに関わらず、ICMP パケットをキャプチャする
tcpdump -i ens160 'ip or vlan' Tag 有り/無しに関わらず、パケットをキャプチャする