#!/usr/bin/python3
import sys
import signal
from bcc import BPF
if len(sys.argv) < 2:
print("Specify interface name as first argument!")
sys.exit(1)
ebpf_program_code = """
#include <uapi/linux/if_ether.h>
#include <uapi/linux/ip.h>
#include <linux/in.h>
#define OVER(x, d) (x + 1 > (typeof(x))d)
int icmp_filter(struct xdp_md *ctx) {
void *data_end = (void *)(uintptr_t)ctx->data_end;
struct ethhdr *eth = (void *)(uintptr_t)ctx->data;
if (OVER(eth, data_end))
return XDP_DROP;
if (eth->h_proto == ntohs(ETH_P_IP)) {
struct iphdr *iph = (struct iphdr *)(eth + 1);
struct icmphdr *icmph = (struct icmphdr *)(iph + 1);
if (OVER(iph, data_end))
return XDP_DROP;
if (iph->protocol == IPPROTO_ICMP)
return XDP_DROP;
} else if (eth->h_proto == ntohs(ETH_P_IPV6)) {
return XDP_PASS;
}
return XDP_PASS;
}
"""
bpf = BPF(text=ebpf_program_code)
interface_name = sys.argv[1]
def signal_handler(sig, frame):
bpf.remove_xdp(dev=interface_name)
signal.signal(signal.SIGINT, signal_handler)
bpf.attach_xdp(dev=interface_name, fn=bpf.load_func("icmp_filter", bpf.XDP), flags=bpf.XDP_FLAGS_SKB_MODE)
print(f"Blocking ICMPv4 traffic on {interface_name}")
signal.pause()
print("Off we pop!")