API Documentation
Shadowserver API abstraction library.
API documentation is available at: https://www.shadowserver.org/what-we-do/network-reporting/api-documentation/
ShadowServerAPI
Source code in shadowserver_api/api.py
class ShadowServerAPI:
def __init__(self, api_url: str, api_key: str, api_secret: str, timeout: int = 45) -> None:
self.api_url = api_url
self.api_key = api_key
self.api_secret_bytes = api_secret.encode()
self.session = httpx.Client(timeout=timeout)
self.logger = logging.getLogger(__name__)
def _generate_hmac(self, request_bytes: bytes) -> str:
hmac_generator = hmac.new(self.api_secret_bytes, request_bytes, hashlib.sha256)
return hmac_generator.hexdigest()
def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
"""Call the specified api method with a request dictionary."""
url = f'{self.api_url}{method}'
self.logger.debug('URL: %s', url)
self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))
request_data['apikey'] = self.api_key
request_bytes = json.dumps(request_data).encode()
request_hmac = self._generate_hmac(request_bytes)
response = self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})
if response.status_code != 200:
try:
response_json = response.json()
raise InvalidRequest(response_json['error'])
except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
if 'No supported report filters found' in response.text:
raise NoSupportedReportFilter(response.text) from exc
raise InvalidRequest(response.text) from exc
try:
return response.json()
except ValueError as exc:
raise InvalidResponse(response.text) from exc
def api_test_ping(self) -> dict[str, typing.Any]:
"""Check your connection to the API server."""
result = self.api_call('test/ping', {})
if not isinstance(result, dict):
raise ValueError('Invalid return value from api_call')
return result
def api_key_info(self) -> dict[str, typing.Any]:
"""Returns details about your apikey."""
result = self.api_call('key/info', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
if len(result) != 1:
raise ValueError('Invalid return value from api_call')
return result[0]
def api_reports_subscribed(self) -> list[str]:
"""List of reports that the user is subscribed to."""
result = self.api_call('reports/subscribed', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
def api_reports_types(self) -> list[str]:
"""List of all the types of reports that are available for the subscriber."""
result = self.api_call('reports/types', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
def api_reports_list(
self,
reports: list[str] | None = None,
start_date: str | None = None,
end_date: str | None = None,
report_type: str | None = None,
limit: int | None = None,
) -> list[dict[str, typing.Any]]:
"""List of actual reports that could be downloaded."""
request_data: dict[str, typing.Any] = {}
if reports:
request_data['reports'] = reports
if limit:
request_data['limit'] = limit
if start_date:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = self.api_call('reports/list', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
"""Download specific report."""
download_url = f'https://dl.shadowserver.org/{report_id}'
res = self.session.get(download_url)
raw = io.StringIO(res.text)
csv_reader = csv.DictReader(raw)
data = list(csv_reader)
return data
def api_reports_query(
self,
query: dict[str, str],
sort: str = 'ascending',
start_date: str | None = None,
end_date: str | None = None,
facet: str | None = None,
limit: int = 1000,
pagination: bool = True,
) -> typing.Iterator[dict[str, typing.Any]]:
"""Do a reports query.
Args:
query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
sort: Sort the output. Valid values are "ascending", "descending".
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
limit: Number of records to pull.
pagination: If set to True, automatically pulls all results using pagination.
"""
if sort not in ('ascending', 'descending'):
raise ValueError('"sort" parameter must be one of: "ascending", "descending"')
request_data = {
'query': query,
'sort': sort,
'page': '1',
}
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if facet:
request_data['facet'] = facet
if limit:
request_data['limit'] = str(limit)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
# only do automatic pagination if page parameter was set
if pagination:
_page = 1
while result:
_page += 1
request_data['page'] = str(_page)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
def api_reports_stats(
self,
start_date: str | None = None,
end_date: str | None = None,
report: typing.Sequence[str] | None = None,
report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
"""Fetch report statistics.
Args:
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
report: Filter by report name.
report_type: Filter by report type.
"""
request_data: dict[str, typing.Any] = {}
if report:
request_data['report'] = report
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = self.api_call('reports/stats', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
def api_reports_device_info(
self,
query: dict[str, str],
) -> dict[str, typing.Any]:
"""Do a reports device-info query, which provides device details for a given IP.
Args:
query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
"""
for k in query.keys():
if k not in ('ip', 'asn', 'geo'):
raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')
request_data = {'query': query}
result = self.api_call('reports/device-info', request_data)
return result
def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
"""Return a schema for a given report type.
Args:
report_type: Report type to get the schema for.
"""
request_data = {'type': report_type}
result = self.api_call('reports/schema', request_data)
return result
api_call(method, request_data)
Call the specified api method with a request dictionary.
Source code in shadowserver_api/api.py
def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
"""Call the specified api method with a request dictionary."""
url = f'{self.api_url}{method}'
self.logger.debug('URL: %s', url)
self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))
request_data['apikey'] = self.api_key
request_bytes = json.dumps(request_data).encode()
request_hmac = self._generate_hmac(request_bytes)
response = self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})
if response.status_code != 200:
try:
response_json = response.json()
raise InvalidRequest(response_json['error'])
except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
if 'No supported report filters found' in response.text:
raise NoSupportedReportFilter(response.text) from exc
raise InvalidRequest(response.text) from exc
try:
return response.json()
except ValueError as exc:
raise InvalidResponse(response.text) from exc
api_key_info()
Returns details about your apikey.
Source code in shadowserver_api/api.py
def api_key_info(self) -> dict[str, typing.Any]:
"""Returns details about your apikey."""
result = self.api_call('key/info', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
if len(result) != 1:
raise ValueError('Invalid return value from api_call')
return result[0]
api_reports_device_info(query)
Do a reports device-info query, which provides device details for a given IP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
dict[str, str]
|
Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope. |
required |
Source code in shadowserver_api/api.py
def api_reports_device_info(
self,
query: dict[str, str],
) -> dict[str, typing.Any]:
"""Do a reports device-info query, which provides device details for a given IP.
Args:
query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
"""
for k in query.keys():
if k not in ('ip', 'asn', 'geo'):
raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')
request_data = {'query': query}
result = self.api_call('reports/device-info', request_data)
return result
api_reports_download(report_id)
Download specific report.
Source code in shadowserver_api/api.py
def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
"""Download specific report."""
download_url = f'https://dl.shadowserver.org/{report_id}'
res = self.session.get(download_url)
raw = io.StringIO(res.text)
csv_reader = csv.DictReader(raw)
data = list(csv_reader)
return data
api_reports_list(reports=None, start_date=None, end_date=None, report_type=None, limit=None)
List of actual reports that could be downloaded.
Source code in shadowserver_api/api.py
def api_reports_list(
self,
reports: list[str] | None = None,
start_date: str | None = None,
end_date: str | None = None,
report_type: str | None = None,
limit: int | None = None,
) -> list[dict[str, typing.Any]]:
"""List of actual reports that could be downloaded."""
request_data: dict[str, typing.Any] = {}
if reports:
request_data['reports'] = reports
if limit:
request_data['limit'] = limit
if start_date:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = self.api_call('reports/list', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_query(query, sort='ascending', start_date=None, end_date=None, facet=None, limit=1000, pagination=True)
Do a reports query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
dict[str, str]
|
The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details. |
required |
sort
|
str
|
Sort the output. Valid values are "ascending", "descending". |
'ascending'
|
start_date
|
str | None
|
Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now". |
None
|
end_date
|
str | None
|
Fetch results up to this date. Format must be (YYYY-MM-DD) or "now". |
None
|
facet
|
str | None
|
Returns the cardinality of each value of the given field sorted from highest to lowest. |
None
|
limit
|
int
|
Number of records to pull. |
1000
|
pagination
|
bool
|
If set to True, automatically pulls all results using pagination. |
True
|
Source code in shadowserver_api/api.py
def api_reports_query(
self,
query: dict[str, str],
sort: str = 'ascending',
start_date: str | None = None,
end_date: str | None = None,
facet: str | None = None,
limit: int = 1000,
pagination: bool = True,
) -> typing.Iterator[dict[str, typing.Any]]:
"""Do a reports query.
Args:
query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
sort: Sort the output. Valid values are "ascending", "descending".
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
limit: Number of records to pull.
pagination: If set to True, automatically pulls all results using pagination.
"""
if sort not in ('ascending', 'descending'):
raise ValueError('"sort" parameter must be one of: "ascending", "descending"')
request_data = {
'query': query,
'sort': sort,
'page': '1',
}
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if facet:
request_data['facet'] = facet
if limit:
request_data['limit'] = str(limit)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
# only do automatic pagination if page parameter was set
if pagination:
_page = 1
while result:
_page += 1
request_data['page'] = str(_page)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
api_reports_schema(report_type)
Return a schema for a given report type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
report_type
|
str
|
Report type to get the schema for. |
required |
Source code in shadowserver_api/api.py
def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
"""Return a schema for a given report type.
Args:
report_type: Report type to get the schema for.
"""
request_data = {'type': report_type}
result = self.api_call('reports/schema', request_data)
return result
api_reports_stats(start_date=None, end_date=None, report=None, report_type=None)
Fetch report statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_date
|
str | None
|
Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now". |
None
|
end_date
|
str | None
|
Fetch results up to this date. Format must be (YYYY-MM-DD) or "now". |
None
|
report
|
Sequence[str] | None
|
Filter by report name. |
None
|
report_type
|
Sequence[str] | None
|
Filter by report type. |
None
|
Source code in shadowserver_api/api.py
def api_reports_stats(
self,
start_date: str | None = None,
end_date: str | None = None,
report: typing.Sequence[str] | None = None,
report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
"""Fetch report statistics.
Args:
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
report: Filter by report name.
report_type: Filter by report type.
"""
request_data: dict[str, typing.Any] = {}
if report:
request_data['report'] = report
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = self.api_call('reports/stats', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_subscribed()
List of reports that the user is subscribed to.
Source code in shadowserver_api/api.py
def api_reports_subscribed(self) -> list[str]:
"""List of reports that the user is subscribed to."""
result = self.api_call('reports/subscribed', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_types()
List of all the types of reports that are available for the subscriber.
Source code in shadowserver_api/api.py
def api_reports_types(self) -> list[str]:
"""List of all the types of reports that are available for the subscriber."""
result = self.api_call('reports/types', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_test_ping()
Check your connection to the API server.
Source code in shadowserver_api/api.py
def api_test_ping(self) -> dict[str, typing.Any]:
"""Check your connection to the API server."""
result = self.api_call('test/ping', {})
if not isinstance(result, dict):
raise ValueError('Invalid return value from api_call')
return result
Shadowserver API abstraction library.
API documentation is available at: https://www.shadowserver.org/what-we-do/network-reporting/api-documentation/
AsyncShadowServerAPI
Source code in shadowserver_api/async_api.py
class AsyncShadowServerAPI:
def __init__(self, api_url: str, api_key: str, api_secret: str, timeout: int = 45) -> None:
self.api_url = api_url
self.api_key = api_key
self.api_secret_bytes = api_secret.encode()
self.session = httpx.AsyncClient(timeout=timeout)
self.logger = logging.getLogger(__name__)
def _generate_hmac(self, request_bytes: bytes) -> str:
hmac_generator = hmac.new(self.api_secret_bytes, request_bytes, hashlib.sha256)
return hmac_generator.hexdigest()
async def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
"""Call the specified api method with a request dictionary."""
url = f'{self.api_url}{method}'
self.logger.debug('URL: %s', url)
self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))
request_data['apikey'] = self.api_key
request_bytes = json.dumps(request_data).encode()
request_hmac = self._generate_hmac(request_bytes)
response = await self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})
if response.status_code != 200:
try:
response_json = response.json()
raise InvalidRequest(response_json['error'])
except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
if 'No supported report filters found' in response.text:
raise NoSupportedReportFilter(response.text) from exc
raise InvalidRequest(response.text) from exc
try:
return response.json()
except ValueError as exc:
raise InvalidResponse(response.text) from exc
async def api_test_ping(self) -> dict[str, typing.Any]:
"""Check your connection to the API server."""
result = await self.api_call('test/ping', {})
if not isinstance(result, dict):
raise ValueError('Invalid return value from api_call')
return result
async def api_key_info(self) -> dict[str, typing.Any]:
"""Returns details about your apikey."""
result = await self.api_call('key/info', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
if len(result) != 1:
raise ValueError('Invalid return value from api_call')
return result[0]
async def api_reports_subscribed(self) -> list[str]:
"""List of reports that the user is subscribed to."""
result = await self.api_call('reports/subscribed', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
async def api_reports_types(self) -> list[str]:
"""List of all the types of reports that are available for the subscriber."""
result = await self.api_call('reports/types', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
async def api_reports_list(
self,
reports: list[str] | None = None,
start_date: str | None = None,
end_date: str | None = None,
report_type: str | None = None,
limit: int | None = None,
) -> list[dict[str, typing.Any]]:
"""List of actual reports that could be downloaded."""
request_data: dict[str, typing.Any] = {}
if reports:
request_data['reports'] = reports
if limit:
request_data['limit'] = limit
if start_date:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = await self.api_call('reports/list', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
async def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
"""Download specific report."""
download_url = f'https://dl.shadowserver.org/{report_id}'
res = await self.session.get(download_url)
raw = io.StringIO(res.text)
csv_reader = csv.DictReader(raw)
data = list(csv_reader)
return data
async def api_reports_query(
self,
query: dict[str, str],
sort: str = 'ascending',
start_date: str | None = None,
end_date: str | None = None,
facet: str | None = None,
limit: int = 1000,
pagination: bool = True,
) -> typing.AsyncIterator[dict[str, typing.Any]]:
"""Do a reports query.
Args:
query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
sort: Sort the output. Valid values are "ascending", "descending".
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
limit: Number of records to pull.
pagination: If set to True, automatically pulls all results using pagination.
"""
if sort not in ('ascending', 'descending'):
raise ValueError('"sort" parameter must be one of: "ascending", "descending"')
request_data = {
'query': query,
'sort': sort,
'page': '1',
}
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if facet:
request_data['facet'] = facet
if limit:
request_data['limit'] = str(limit)
result = await self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
# only do automatic pagination if page parameter was set
if pagination:
_page = 1
while result:
_page += 1
request_data['page'] = str(_page)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
async def api_reports_stats(
self,
start_date: str | None = None,
end_date: str | None = None,
report: typing.Sequence[str] | None = None,
report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
"""Fetch report statistics.
Args:
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
report: Filter by report name.
report_type: Filter by report type.
"""
request_data: dict[str, typing.Any] = {}
if report:
request_data['report'] = report
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = await self.api_call('reports/stats', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
async def api_reports_device_info(
self,
query: dict[str, str],
) -> dict[str, typing.Any]:
"""Do a reports device-info query, which provides device details for a given IP.
Args:
query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
"""
for k in query.keys():
if k not in ('ip', 'asn', 'geo'):
raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')
request_data = {'query': query}
result = await self.api_call('reports/device-info', request_data)
return result
async def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
"""Return a schema for a given report type.
Args:
report_type: Report type to get the schema for.
"""
request_data = {'type': report_type}
result = await self.api_call('reports/schema', request_data)
return result
api_call(method, request_data)
async
Call the specified api method with a request dictionary.
Source code in shadowserver_api/async_api.py
async def api_call(self, method: str, request_data: dict[str, typing.Any]) -> typing.Any:
"""Call the specified api method with a request dictionary."""
url = f'{self.api_url}{method}'
self.logger.debug('URL: %s', url)
self.logger.debug('DATA: %s', json.dumps(request_data, indent=2))
request_data['apikey'] = self.api_key
request_bytes = json.dumps(request_data).encode()
request_hmac = self._generate_hmac(request_bytes)
response = await self.session.post(url, content=request_bytes, headers={'HMAC2': request_hmac})
if response.status_code != 200:
try:
response_json = response.json()
raise InvalidRequest(response_json['error'])
except (ValueError, KeyError, json.decoder.JSONDecodeError) as exc:
if 'No supported report filters found' in response.text:
raise NoSupportedReportFilter(response.text) from exc
raise InvalidRequest(response.text) from exc
try:
return response.json()
except ValueError as exc:
raise InvalidResponse(response.text) from exc
api_key_info()
async
Returns details about your apikey.
Source code in shadowserver_api/async_api.py
async def api_key_info(self) -> dict[str, typing.Any]:
"""Returns details about your apikey."""
result = await self.api_call('key/info', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
if len(result) != 1:
raise ValueError('Invalid return value from api_call')
return result[0]
api_reports_device_info(query)
async
Do a reports device-info query, which provides device details for a given IP.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
dict[str, str]
|
Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope. |
required |
Source code in shadowserver_api/async_api.py
async def api_reports_device_info(
self,
query: dict[str, str],
) -> dict[str, typing.Any]:
"""Do a reports device-info query, which provides device details for a given IP.
Args:
query: Dictionary containing search criteria. Note that the query must be limited to your organisation's report scope.
"""
for k in query.keys():
if k not in ('ip', 'asn', 'geo'):
raise ValueError('query parameter may only contain the following keys: "ip", "asn", "geo".')
request_data = {'query': query}
result = await self.api_call('reports/device-info', request_data)
return result
api_reports_download(report_id)
async
Download specific report.
Source code in shadowserver_api/async_api.py
async def api_reports_download(self, report_id: str) -> list[dict[str, typing.Any]]:
"""Download specific report."""
download_url = f'https://dl.shadowserver.org/{report_id}'
res = await self.session.get(download_url)
raw = io.StringIO(res.text)
csv_reader = csv.DictReader(raw)
data = list(csv_reader)
return data
api_reports_list(reports=None, start_date=None, end_date=None, report_type=None, limit=None)
async
List of actual reports that could be downloaded.
Source code in shadowserver_api/async_api.py
async def api_reports_list(
self,
reports: list[str] | None = None,
start_date: str | None = None,
end_date: str | None = None,
report_type: str | None = None,
limit: int | None = None,
) -> list[dict[str, typing.Any]]:
"""List of actual reports that could be downloaded."""
request_data: dict[str, typing.Any] = {}
if reports:
request_data['reports'] = reports
if limit:
request_data['limit'] = limit
if start_date:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = await self.api_call('reports/list', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_query(query, sort='ascending', start_date=None, end_date=None, facet=None, limit=1000, pagination=True)
async
Do a reports query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
dict[str, str]
|
The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details. |
required |
sort
|
str
|
Sort the output. Valid values are "ascending", "descending". |
'ascending'
|
start_date
|
str | None
|
Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now". |
None
|
end_date
|
str | None
|
Fetch results up to this date. Format must be (YYYY-MM-DD) or "now". |
None
|
facet
|
str | None
|
Returns the cardinality of each value of the given field sorted from highest to lowest. |
None
|
limit
|
int
|
Number of records to pull. |
1000
|
pagination
|
bool
|
If set to True, automatically pulls all results using pagination. |
True
|
Source code in shadowserver_api/async_api.py
async def api_reports_query(
self,
query: dict[str, str],
sort: str = 'ascending',
start_date: str | None = None,
end_date: str | None = None,
facet: str | None = None,
limit: int = 1000,
pagination: bool = True,
) -> typing.AsyncIterator[dict[str, typing.Any]]:
"""Do a reports query.
Args:
query: The query must contain one attribute that matches your organization’s report filters. See https://www.shadowserver.org/what-we-do/network-reporting/api-reports-query/ for details.
sort: Sort the output. Valid values are "ascending", "descending".
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
facet: Returns the cardinality of each value of the given field sorted from highest to lowest.
limit: Number of records to pull.
pagination: If set to True, automatically pulls all results using pagination.
"""
if sort not in ('ascending', 'descending'):
raise ValueError('"sort" parameter must be one of: "ascending", "descending"')
request_data = {
'query': query,
'sort': sort,
'page': '1',
}
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if facet:
request_data['facet'] = facet
if limit:
request_data['limit'] = str(limit)
result = await self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
# only do automatic pagination if page parameter was set
if pagination:
_page = 1
while result:
_page += 1
request_data['page'] = str(_page)
result = self.api_call('reports/query', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
for res in result:
yield res
api_reports_schema(report_type)
async
Return a schema for a given report type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
report_type
|
str
|
Report type to get the schema for. |
required |
Source code in shadowserver_api/async_api.py
async def api_reports_schema(self, report_type: str) -> dict[str, typing.Any]:
"""Return a schema for a given report type.
Args:
report_type: Report type to get the schema for.
"""
request_data = {'type': report_type}
result = await self.api_call('reports/schema', request_data)
return result
api_reports_stats(start_date=None, end_date=None, report=None, report_type=None)
async
Fetch report statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_date
|
str | None
|
Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now". |
None
|
end_date
|
str | None
|
Fetch results up to this date. Format must be (YYYY-MM-DD) or "now". |
None
|
report
|
Sequence[str] | None
|
Filter by report name. |
None
|
report_type
|
Sequence[str] | None
|
Filter by report type. |
None
|
Source code in shadowserver_api/async_api.py
async def api_reports_stats(
self,
start_date: str | None = None,
end_date: str | None = None,
report: typing.Sequence[str] | None = None,
report_type: typing.Sequence[str] | None = None,
) -> list[tuple[str, str, str]]:
"""Fetch report statistics.
Args:
start_date: Fetch results starting from this date. Format must be (YYYY-MM-DD) or "now".
end_date: Fetch results up to this date. Format must be (YYYY-MM-DD) or "now".
report: Filter by report name.
report_type: Filter by report type.
"""
request_data: dict[str, typing.Any] = {}
if report:
request_data['report'] = report
if start_date is not None:
if not regex_date.match(start_date):
raise ValueError('Invalid start_date format.')
if end_date is not None:
if not regex_date.match(end_date):
raise ValueError('Invalid end_date format.')
request_data['date'] = f'{start_date}:{end_date}'
else:
request_data['date'] = f'{start_date}'
if report_type:
request_data['type'] = report_type
result = await self.api_call('reports/stats', request_data)
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_subscribed()
async
List of reports that the user is subscribed to.
Source code in shadowserver_api/async_api.py
async def api_reports_subscribed(self) -> list[str]:
"""List of reports that the user is subscribed to."""
result = await self.api_call('reports/subscribed', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_reports_types()
async
List of all the types of reports that are available for the subscriber.
Source code in shadowserver_api/async_api.py
async def api_reports_types(self) -> list[str]:
"""List of all the types of reports that are available for the subscriber."""
result = await self.api_call('reports/types', {})
if not isinstance(result, list):
raise ValueError('Invalid return value from api_call')
return result
api_test_ping()
async
Check your connection to the API server.
Source code in shadowserver_api/async_api.py
async def api_test_ping(self) -> dict[str, typing.Any]:
"""Check your connection to the API server."""
result = await self.api_call('test/ping', {})
if not isinstance(result, dict):
raise ValueError('Invalid return value from api_call')
return result