eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_rir_transfers.py (7412B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_rir_transfers.py 4 5 import json 6 import ipaddress 7 import time 8 from datetime import datetime 9 10 try: 11 import aiohttp 12 except ImportError: 13 raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') 14 15 16 # Set a default elasticsearch index if one is not provided 17 default_index = 'eris-rir-transfers' 18 19 # Transfers data sources 20 transfers_db = { 21 'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/transfers/transfers_latest.json', 22 'apnic' : 'https://ftp.apnic.net/stats/apnic/transfers/transfers_latest.json', 23 'arin' : 'https://ftp.arin.net/pub/stats/arin/transfers/transfers_latest.json', 24 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json', 25 'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json' 26 } 27 28 29 def construct_map() -> dict: 30 '''Construct the Elasticsearch index mapping for records''' 31 32 # Match on exact value or full text search 33 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 34 35 # Construct the index mapping 36 mapping = { 37 'mappings': { 38 'properties': { 39 'transfer_date' : { 'type': 'date' }, 40 'source_registration_date': { 'type': 'date' }, 41 'recipient_registration_date': { 'type': 'date' }, 42 'ip4nets' : { 43 'properties': { 44 'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }, 45 'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } } 46 } 47 }, 48 'ip6nets' : { 49 'properties': { 50 'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }, 51 'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } } 52 } 53 }, 54 'asns' : { 55 'properties': { 56 'original_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } }, 57 'transfer_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } } 58 } 59 }, 60 'type' : { 'type': 'keyword' }, 61 'source_organization' : { 'properties': { 'name': keyword_mapping, 'country_code': { 'type': 'keyword' } } }, 62 'recipient_organization' : { 'properties': { 'name': keyword_mapping, 'country_code': { 'type': 'keyword' } } }, 63 'source_rir' : { 'type': 'keyword' }, 64 'recipient_rir' : { 'type': 'keyword' }, 65 'seen' : { 'type': 'date' } 66 } 67 } 68 } 69 70 return mapping 71 72 73 def normalize_date(date_str: str) -> str: 74 ''' 75 Convert date string to ISO 8601 format 76 77 :param date_str: Date string to convert 78 ''' 79 80 try: 81 # Parse the date with various formats 82 for fmt in ('%Y-%m-%d %H:%M:%S.%f%z', '%Y-%m-%d %H:%M:%S%z', '%Y-%m-%d %H:%M:%S'): 83 try: 84 dt = datetime.strptime(date_str, fmt) 85 return dt.strftime('%Y-%m-%dT%H:%M:%SZ') 86 except ValueError: 87 continue 88 return date_str 89 except: 90 return date_str 91 92 93 async def process_data(place_holder: str = None): 94 ''' 95 Read and process the transfer data. 96 97 :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. 98 ''' 99 100 for registry, url in transfers_db.items(): 101 try: 102 headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC 103 104 async with aiohttp.ClientSession(headers=headers) as session: 105 async with session.get(url) as response: 106 if response.status != 200: 107 raise Exception(f'Failed to fetch {registry} delegation data: {response.status}') 108 109 data = await response.text() 110 111 try: 112 json_data = json.loads(data) 113 except json.JSONDecodeError as e: 114 raise Exception(f'Failed to parse {registry} delegation data: {e}') 115 116 if 'transfers' not in json_data: 117 raise Exception(f'Invalid {registry} delegation data: {json_data}') 118 119 for record in json_data['transfers']: 120 record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) 121 122 # Normalize all date fields 123 for date_field in ('transfer_date', 'source_registration_date', 'recipient_registration_date'): 124 if date_field in record: 125 record[date_field] = normalize_date(record[date_field]) 126 127 if 'asns' in record: 128 for set_type in ('original_set', 'transfer_set'): 129 if set_type in record['asns']: 130 count = 0 131 for set_block in record['asns'][set_type]: 132 for option in ('start', 'end'): 133 asn = set_block[option] 134 if type(asn) != int: 135 if not asn.isdigit(): 136 raise Exception(f'Invalid {set_type} {option} ASN in {registry} data: {asn}') 137 else: 138 record['asns'][set_type][count][option] = int(asn) 139 count += 1 140 141 142 if 'ip4nets' in record or 'ip6nets' in record: 143 for ip_type in ('ip4nets', 'ip6nets'): 144 if ip_type in record: 145 for set_type in ('original_set', 'transfer_set'): 146 if set_type in record[ip_type]: 147 count = 0 148 for set_block in record[ip_type][set_type]: 149 for option in ('start_address', 'end_address'): 150 try: 151 ipaddress.ip_address(set_block[option]) 152 except ValueError as e: 153 octets = set_block[option].split('.') 154 normalized_ip = '.'.join(str(int(octet)) for octet in octets) 155 try: 156 ipaddress.ip_address(normalized_ip) 157 record[ip_type][set_type][count][option] = normalized_ip 158 except ValueError as e: 159 raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}') 160 count += 1 161 162 if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'): 163 raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}') 164 165 yield {'_index': default_index, '_source': record} 166 167 except Exception as e: 168 raise Exception(f'Error processing {registry} delegation data: {e}') 169 170 171 async def test(): 172 '''Test the ingestion process''' 173 174 async for document in process_data(): 175 print(document) 176 177 178 179 if __name__ == '__main__': 180 import asyncio 181 182 asyncio.run(test()) 183 184 185 186 ''' 187 Output: 188 { 189 "transfer_date" : "2017-09-15T19:00:00Z", 190 "ip4nets" : { 191 "original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ], 192 "transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ] 193 }, 194 "type" : "MERGER_ACQUISITION", 195 "source_organization" : { "name": "Unser Ortsnetz GmbH" }, 196 "recipient_organization" : { 197 "name" : "Deutsche Glasfaser Wholesale GmbH", 198 "country_code" : "DE" 199 }, 200 "source_rir" : "RIPE NCC", 201 "recipient_rir" : "RIPE NCC" 202 }, 203 { 204 "transfer_date" : "2017-09-18T19:00:00Z", 205 "asns" : { 206 "original_set" : [ { "start": 198257, "end": 198257 } ], 207 "transfer_set" : [ { "start": 198257, "end": 198257 } ] 208 }, 209 "type" : "MERGER_ACQUISITION", 210 "source_organization" : { "name": "CERT PLIX Sp. z o.o." }, 211 "recipient_organization" : { 212 "name" : "Equinix (Poland) Sp. z o.o.", 213 "country_code" : "PL" 214 }, 215 "source_rir" : "RIPE NCC", 216 "recipient_rir" : "RIPE NCC" 217 } 218 219 Input: 220 Nothing changed from the output for now... 221 '''