|
@@ -1,19 +1,23 @@
|
|
-import csv, os, re, sys, argparse
|
|
|
|
|
|
+import csv
|
|
|
|
+import os
|
|
|
|
+import sys
|
|
|
|
+import argparse
|
|
from logger import *
|
|
from logger import *
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
|
+from tqdm import tqdm
|
|
|
|
+ARGS = None
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
parse = argparse.ArgumentParser(
|
|
parse = argparse.ArgumentParser(
|
|
description="Программа для сбора информации из csv файлов")
|
|
description="Программа для сбора информации из csv файлов")
|
|
|
|
+
|
|
parse.add_argument('-p', '--path', help='Указать в какой директории искать',
|
|
parse.add_argument('-p', '--path', help='Указать в какой директории искать',
|
|
default=os.path.dirname(sys.argv[0]))
|
|
default=os.path.dirname(sys.argv[0]))
|
|
parse.add_argument('-e', '--open_encoding',
|
|
parse.add_argument('-e', '--open_encoding',
|
|
help='Указать в какой кодировке открывать', default='utf-16')
|
|
help='Указать в какой кодировке открывать', default='utf-16')
|
|
-args = parse.parse_args()
|
|
|
|
|
|
+parse.add_argument('-d', '--delimiter', help='разделитель csv', default=',')
|
|
|
|
+
|
|
|
|
|
|
-print(args)
|
|
|
|
LIST_BAD_ADAPTER = ['Wireless', 'Bluetooth', 'Wireless', 'WiFi',
|
|
LIST_BAD_ADAPTER = ['Wireless', 'Bluetooth', 'Wireless', 'WiFi',
|
|
'Kaspersky', 'VirtualBox', 'TAP-Windows',
|
|
'Kaspersky', 'VirtualBox', 'TAP-Windows',
|
|
'Wintun', '802.11', 'VMware', 'VPN', 'Wi-Fi',
|
|
'Wintun', '802.11', 'VMware', 'VPN', 'Wi-Fi',
|
|
@@ -28,6 +32,16 @@ def check_correct_controller(name) -> bool:
|
|
|
|
|
|
|
|
|
|
class ObjectReady:
|
|
class ObjectReady:
|
|
|
|
+
|
|
|
|
+ def __init__(self) -> None:
|
|
|
|
+ self.frame: str = None
|
|
|
|
+ self.cabinet: str = None
|
|
|
|
+ self.os: str = None
|
|
|
|
+ self.cpu: str = None
|
|
|
|
+ self.ram: str = None
|
|
|
|
+ self.motherboard: str = None
|
|
|
|
+ self.count_interface: int = 0
|
|
|
|
+
|
|
def get_len_dict(self) -> int:
|
|
def get_len_dict(self) -> int:
|
|
return len(self.__dict__)
|
|
return len(self.__dict__)
|
|
|
|
|
|
@@ -37,7 +51,7 @@ class ObjectReady:
|
|
|
|
|
|
def get_paths() -> list:
|
|
def get_paths() -> list:
|
|
list_path = []
|
|
list_path = []
|
|
- for root, _, files in os.walk(args.path):
|
|
|
|
|
|
+ for root, _, files in os.walk(ARGS.path):
|
|
list_path += [
|
|
list_path += [
|
|
F"{root}/{file}" for file in files if file.split('.')[-1] == 'csv' and "result" not in file]
|
|
F"{root}/{file}" for file in files if file.split('.')[-1] == 'csv' and "result" not in file]
|
|
return list_path
|
|
return list_path
|
|
@@ -57,71 +71,90 @@ def get_ser_motherboard(row: list) -> str:
|
|
|
|
|
|
def get_data_from_file(path: str, obj: ObjectReady) -> None:
|
|
def get_data_from_file(path: str, obj: ObjectReady) -> None:
|
|
global COUNT_INTERFACE
|
|
global COUNT_INTERFACE
|
|
-
|
|
|
|
- with open(path, 'r', encoding=args.open_encoding) as file:
|
|
|
|
-
|
|
|
|
- cs = csv.reader(file, delimiter=',')
|
|
|
|
|
|
|
|
- next(cs)
|
|
|
|
- line_info_computers = next(cs)
|
|
|
|
- obj.os = line_info_computers[7]
|
|
|
|
- obj.cpu = line_info_computers[13]
|
|
|
|
- obj.ram = convert_mb_to_gb(line_info_computers[14])
|
|
|
|
- obj.motherboard = None
|
|
|
|
- obj.count_interface = 0
|
|
|
|
|
|
+ with open(path, 'r', encoding=ARGS.open_encoding) as file:
|
|
|
|
+ cs = csv.reader(file, delimiter=',')
|
|
count_interface = 0
|
|
count_interface = 0
|
|
-
|
|
|
|
|
|
+
|
|
for line in cs:
|
|
for line in cs:
|
|
data = list(pd.Series(line))
|
|
data = list(pd.Series(line))
|
|
- if data[0] == '6200':
|
|
|
|
- obj.motherboard = get_ser_motherboard(data[6])
|
|
|
|
-
|
|
|
|
- if data[0] == '2600' and len(data) > 2 and check_correct_controller(data[3]):
|
|
|
|
- setattr(obj, F'ip{count_interface+1}', data[6].split(' ')[0])
|
|
|
|
- setattr(obj, F'mac{count_interface+1}', data[-3])
|
|
|
|
- count_interface += 1
|
|
|
|
|
|
+ match data[0]:
|
|
|
|
+ case '300':
|
|
|
|
+ obj.os = data[7]
|
|
|
|
+ obj.cpu = data[13]
|
|
|
|
+ obj.ram = convert_mb_to_gb(data[14])
|
|
|
|
+ case '6200':
|
|
|
|
+ obj.motherboard = get_ser_motherboard(data[6])
|
|
|
|
+ case '2600':
|
|
|
|
+ if len(data) > 2 and check_correct_controller(data[3]):
|
|
|
|
+ setattr(obj, F'ip{count_interface+1}',
|
|
|
|
+ data[6].split(' ')[0])
|
|
|
|
+ setattr(obj, F'mac{count_interface+1}', data[-3])
|
|
|
|
+ count_interface += 1
|
|
|
|
+ case _: continue
|
|
|
|
|
|
# Увеличиваем значение если количестов интерфейсов больше в этой строке
|
|
# Увеличиваем значение если количестов интерфейсов больше в этой строке
|
|
obj.count_interface = count_interface
|
|
obj.count_interface = count_interface
|
|
if count_interface > COUNT_INTERFACE:
|
|
if count_interface > COUNT_INTERFACE:
|
|
COUNT_INTERFACE = count_interface
|
|
COUNT_INTERFACE = count_interface
|
|
|
|
|
|
- file.close()
|
|
|
|
-
|
|
|
|
|
|
|
|
def get_ready_information() -> list[ObjectReady]:
|
|
def get_ready_information() -> list[ObjectReady]:
|
|
list_objects = []
|
|
list_objects = []
|
|
- for path in get_paths():
|
|
|
|
-
|
|
|
|
|
|
+ paths = get_paths()
|
|
|
|
+
|
|
|
|
+ progressbar = tqdm(range(0, len(paths)), desc='Progress')
|
|
|
|
+ for path in paths:
|
|
|
|
+
|
|
obj = ObjectReady()
|
|
obj = ObjectReady()
|
|
obj.frame = os.path.basename(os.path.dirname(path))
|
|
obj.frame = os.path.basename(os.path.dirname(path))
|
|
- obj.cabinet = re.findall(
|
|
|
|
- r"\/([\w+?\ ?[а-яА-Я]+\ ?|\w+?\ ?[a-zA-Z]+\ ?|\d+])[\.|\,]", path)[-1]
|
|
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ obj.cabinet = str(os.path.basename(path)).split('.')[0]
|
|
|
|
+ except IndexError as er:
|
|
|
|
+ logger.error(F"index error path: {path}")
|
|
|
|
+ continue
|
|
|
|
|
|
try:
|
|
try:
|
|
get_data_from_file(path, obj)
|
|
get_data_from_file(path, obj)
|
|
|
|
+ logger.info(F'Parse success file: {path}')
|
|
except UnicodeDecodeError as ude:
|
|
except UnicodeDecodeError as ude:
|
|
- logger.error(F"{path} --> {ude}\n")
|
|
|
|
|
|
+ logger.error(F"{path} --> {ude}")
|
|
except UnicodeError as er:
|
|
except UnicodeError as er:
|
|
- logger.error(F"{path} --> {er}\n")
|
|
|
|
- except StopIteration as si:
|
|
|
|
- logger.error(F"{path} --> Пустой файл или тмпо того\n")
|
|
|
|
|
|
+ logger.error(F"{path} --> {er}")
|
|
|
|
+ except StopIteration:
|
|
|
|
+ logger.error(F"{path} --> Пустой файл или тмпо того")
|
|
|
|
|
|
list_objects.append(obj)
|
|
list_objects.append(obj)
|
|
|
|
+ progressbar.update(1)
|
|
return list_objects
|
|
return list_objects
|
|
|
|
|
|
|
|
|
|
|
|
+def create_header_interface() -> str:
|
|
|
|
+ return ARGS.delimiter.join([
|
|
|
|
+ ARGS.delimiter.join((F"ip{i+1}", F"mac{i+1}"))
|
|
|
|
+ for i in range(COUNT_INTERFACE)
|
|
|
|
+ ])
|
|
|
|
+
|
|
|
|
+
|
|
def create_csv(list_obj: list[ObjectReady]) -> None:
|
|
def create_csv(list_obj: list[ObjectReady]) -> None:
|
|
namefile = F"result-{datetime.today().strftime('%Y-%m-%d-%H.%M.%S')}.xlsx"
|
|
namefile = F"result-{datetime.today().strftime('%Y-%m-%d-%H.%M.%S')}.xlsx"
|
|
data = [row.get_row() for row in list_obj]
|
|
data = [row.get_row() for row in list_obj]
|
|
df = pd.DataFrame(data=data)
|
|
df = pd.DataFrame(data=data)
|
|
- df.to_excel(namefile, index= False)
|
|
|
|
-
|
|
|
|
|
|
+ df.to_excel(namefile, index=False)
|
|
|
|
+
|
|
|
|
+
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
- create_logger()
|
|
|
|
-
|
|
|
|
- logger.info("Start programm")
|
|
|
|
- logger.info(F"Base dir: {args.path}")
|
|
|
|
-
|
|
|
|
- create_csv(get_ready_information())
|
|
|
|
- logger.info("Stop programm")
|
|
|
|
|
|
+ try:
|
|
|
|
+ logger.info("Start programm")
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ ARGS = parse.parse_args(sys.argv[1:])
|
|
|
|
+ except argparse.ArgumentError:
|
|
|
|
+ parse.print_help()
|
|
|
|
+ create_logger()
|
|
|
|
+ logger.info(F"Base dir: {ARGS.path}")
|
|
|
|
+ create_csv(get_ready_information())
|
|
|
|
+ logger.info("Stop programm")
|
|
|
|
+ except KeyboardInterrupt:
|
|
|
|
+ logger.info("Отмена операции")
|