代码
from argparse import ArgumentParser
import socket
import struct
import time
import array
import random
import threading
class IcmpScan:
def __init__(self, ip_list, timeout=3):
self.ips = ip_list
self.__data = struct.pack('d', time.time()) # 需要发送的数据
self.__id = random.randint(1000, 65535)
self.timeout = timeout
self.socket = self.rawSocket
self.finished = False
self.ok = []
@property
def rawSocket(self):
try:
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
Sock.settimeout(self.timeout)
except:
Sock = self.rawSocket
return Sock
def inCksum(self, packet):
if len(packet) & 1:
packet = packet + '\0'
words = array.array('h', packet)
sum = 0
for word in words:
sum += (word & 0xffff)
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
return (~sum) & 0xffff
def create_packet(self):
header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0)
packet = header + self.__data
chkSum = self.inCksum(packet)
header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
return header + self.__data
def recv_packet(self):
while 1:
try:
recv_packet, addr = self.socket.recvfrom(1024)
type, code, checksum, packet_ID, sequence = struct.unpack("bbHHh", recv_packet[20:28])
if packet_ID == self.__id:
ttl = struct.unpack("!BBHHHBBHII", recv_packet[:20])[5]
print(f"检测到存活设备: {addr[0]} ttl: {ttl}")
if addr[0] not in self.ok:
self.ok.append(str(addr[0]))
except:
if self.finished:
break
def start(self):
# random.shuffle(self.ips) # 乱序一下地址
packet = self.create_packet()
time_space = 1 / 1000
t = threading.Thread(target=self.recv_packet,)
t.start()
for i, ip in enumerate(self.ips):
try:
self.socket.sendto(packet, (ip, 0))
time.sleep(time_space)
except socket.timeout:
break
except:
pass
self.finished = True
time.sleep(self.timeout + 1)
self.socket.close()
t.join()
if __name__ == '__main__':
py_ver = "2022.0325.1633"
arg = ArgumentParser(description='当前脚本版本: %s' % py_ver, prog="ICMP检测")
arg.add_argument('-i', '--ip', type=str, default='10.1.1.',
help="设置需要检测到IP段,默认: 10.1.1.", required=False)
arg.add_argument('-s', '--start', type=int, default=1,
help="设置需要检测的起始IP,默认: 1", required=False)
arg.add_argument('-e', '--end', type=int, default=255,
help="设置需要检测的终止IP,默认: 255", required=False)
args = arg.parse_args()
ip = args.ip
start = args.start
end = args.end
ip_list = [str(ip)+ str(i) for i in range(int(start), int(end))]
icmp_scan = IcmpScan(ip_list)
icmp_scan.start()
for i in icmp_scan.ok:
ip_list.remove(i)
for i in ip_list:
print(f"不存活的IP: {i}")
效果