eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_rir_transfers.py (7412B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_rir_transfers.py
      4 
      5 import json
      6 import ipaddress
      7 import time
      8 from datetime import datetime
      9 
     10 try:
     11 	import aiohttp
     12 except ImportError:
     13 	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     14 
     15 
     16 # Set a default elasticsearch index if one is not provided
     17 default_index = 'eris-rir-transfers'
     18 
     19 # Transfers data sources
     20 transfers_db = {
     21 	'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/transfers/transfers_latest.json',
     22 	'apnic'   : 'https://ftp.apnic.net/stats/apnic/transfers/transfers_latest.json',
     23 	'arin'    : 'https://ftp.arin.net/pub/stats/arin/transfers/transfers_latest.json',
     24 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json',
     25 	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json'
     26 }
     27 
     28 
     29 def construct_map() -> dict:
     30 	'''Construct the Elasticsearch index mapping for records'''
     31 
     32 	# Match on exact value or full text search
     33 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     34 
     35 	# Construct the index mapping
     36 	mapping = {
     37 		'mappings': {
     38 			'properties': {
     39 				'transfer_date' : { 'type': 'date' },
     40 				'source_registration_date': { 'type': 'date' },
     41 				'recipient_registration_date': { 'type': 'date' },
     42 				'ip4nets' : {
     43 					'properties': {
     44 						'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } },
     45 						'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }
     46 					}
     47 				},
     48 				'ip6nets' : {
     49 					'properties': {
     50 						'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } },
     51 						'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }
     52 					}
     53 				},
     54 				'asns' : {
     55 					'properties': {
     56 						'original_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } },
     57 						'transfer_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } }
     58 					}
     59 				},
     60 				'type'                   : { 'type': 'keyword' },
     61 				'source_organization'    : { 'properties': { 'name':  keyword_mapping, 'country_code': { 'type': 'keyword' } } },
     62 				'recipient_organization' : { 'properties': { 'name':  keyword_mapping, 'country_code': { 'type': 'keyword' } } },
     63 				'source_rir'             : { 'type': 'keyword' },
     64 				'recipient_rir'          : { 'type': 'keyword' },
     65 				'seen'                   : { 'type': 'date' }
     66 			}
     67 		}
     68 	}
     69 
     70 	return mapping
     71 
     72 
     73 def normalize_date(date_str: str) -> str:
     74 	'''
     75 	Convert date string to ISO 8601 format
     76 
     77 	:param date_str: Date string to convert
     78 	'''
     79 
     80 	try:
     81 		# Parse the date with various formats
     82 		for fmt in ('%Y-%m-%d %H:%M:%S.%f%z', '%Y-%m-%d %H:%M:%S%z', '%Y-%m-%d %H:%M:%S'):
     83 			try:
     84 				dt = datetime.strptime(date_str, fmt)
     85 				return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
     86 			except ValueError:
     87 				continue
     88 		return date_str
     89 	except:
     90 		return date_str
     91 
     92 
     93 async def process_data(place_holder: str = None):
     94 	'''
     95 	Read and process the transfer data.
     96 
     97 	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
     98 	'''
     99 
    100 	for registry, url in transfers_db.items():
    101 		try:
    102 			headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC
    103 
    104 			async with aiohttp.ClientSession(headers=headers) as session:
    105 				async with session.get(url) as response:
    106 					if response.status != 200:
    107 						raise Exception(f'Failed to fetch {registry} delegation data: {response.status}')
    108 
    109 					data = await response.text()
    110 
    111 					try:
    112 						json_data = json.loads(data)
    113 					except json.JSONDecodeError as e:
    114 						raise Exception(f'Failed to parse {registry} delegation data: {e}')
    115 
    116 					if 'transfers' not in json_data:
    117 						raise Exception(f'Invalid {registry} delegation data: {json_data}')
    118 
    119 					for record in json_data['transfers']:
    120 						record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
    121 
    122 						# Normalize all date fields
    123 						for date_field in ('transfer_date', 'source_registration_date', 'recipient_registration_date'):
    124 							if date_field in record:
    125 								record[date_field] = normalize_date(record[date_field])
    126 
    127 						if 'asns' in record:
    128 							for set_type in ('original_set', 'transfer_set'):
    129 								if set_type in record['asns']:
    130 									count = 0
    131 									for set_block in record['asns'][set_type]:
    132 										for option in ('start', 'end'):
    133 											asn = set_block[option]
    134 											if type(asn) != int:
    135 												if not asn.isdigit():
    136 													raise Exception(f'Invalid {set_type} {option} ASN in {registry} data: {asn}')
    137 												else:
    138 													record['asns'][set_type][count][option] = int(asn)
    139 										count += 1
    140 
    141 
    142 						if 'ip4nets' in record or 'ip6nets' in record:
    143 							for ip_type in ('ip4nets', 'ip6nets'):
    144 								if ip_type in record:
    145 									for set_type in ('original_set', 'transfer_set'):
    146 										if set_type in record[ip_type]:
    147 											count = 0
    148 											for set_block in record[ip_type][set_type]:
    149 												for option in ('start_address', 'end_address'):
    150 													try:
    151 														ipaddress.ip_address(set_block[option])
    152 													except ValueError as e:
    153 														octets = set_block[option].split('.')
    154 														normalized_ip = '.'.join(str(int(octet)) for octet in octets)
    155 														try:
    156 															ipaddress.ip_address(normalized_ip)
    157 															record[ip_type][set_type][count][option] = normalized_ip
    158 														except ValueError as e:
    159 															raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}')
    160 												count += 1
    161 
    162 						if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'):
    163 							raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}')
    164 
    165 						yield {'_index': default_index, '_source': record}
    166 
    167 		except Exception as e:
    168 			raise Exception(f'Error processing {registry} delegation data: {e}')
    169 
    170 
    171 async def test():
    172 	'''Test the ingestion process'''
    173 
    174 	async for document in process_data():
    175 		print(document)
    176 
    177 
    178 
    179 if __name__ == '__main__':
    180 	import asyncio
    181 
    182 	asyncio.run(test())
    183 
    184 
    185 
    186 '''
    187 Output:
    188 	{
    189 		"transfer_date" : "2017-09-15T19:00:00Z",
    190 	 	"ip4nets"       : {
    191 			"original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ],
    192 		 	"transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ]
    193 		},
    194 		"type"                   : "MERGER_ACQUISITION",
    195 		"source_organization"    : { "name": "Unser Ortsnetz GmbH" },
    196 		"recipient_organization" : {
    197 			"name"         : "Deutsche Glasfaser Wholesale GmbH",
    198 			"country_code" : "DE"
    199 		},
    200 		"source_rir"    : "RIPE NCC",
    201 		"recipient_rir" : "RIPE NCC"
    202 	},
    203 	{
    204 		"transfer_date" : "2017-09-18T19:00:00Z",
    205 	 	"asns"          : {
    206 			"original_set" : [ { "start": 198257, "end": 198257 } ],
    207 			"transfer_set" : [ { "start": 198257, "end": 198257 } ]
    208 		},
    209 		"type"                   : "MERGER_ACQUISITION",
    210 		"source_organization"    : { "name": "CERT PLIX Sp. z o.o." },
    211 		"recipient_organization" : {
    212 			"name"         : "Equinix (Poland) Sp. z o.o.",
    213 			"country_code" : "PL"
    214 		 },
    215 		"source_rir"    : "RIPE NCC",
    216 		"recipient_rir" : "RIPE NCC"
    217 	}
    218 
    219 Input:
    220 	Nothing changed from the output for now...
    221 '''