Secure Code Review -1 | Cheat sheet For Security Vulnerability In Python — Injection Flaws
Based on OWASP Top-10 Vulnerabilities. This time we are looking for secure coding bugs related to Injection Flaws
1) Path Traversal Attack
- Vulnerable Code Block
def get_video(self, path=None):
self.check_user_auth()
data = None
if not path:
path = self.get_video_path()
path = path[0] if path else None
if path:
with open(path, 'rb') as f:
data = bytearray(f.read())
return data
- Impact
Allowing users to pass the video path as a parameter makes it vulnerable to path traversal injection. An adversary could get access to any file on the server manipulating the path (get_video('/etc/passwd')
).
- Fix
It’s recommended to avoid using untrusted input to build a file’s path. The application is building the path
parameter on server-side instead of trusting an input value. In this way, an attacker will not be able to carry out a Path Traversal attack by manipulating input parameters.
def get_video(self):
self.check_user_auth()
data = None
path = self.get_video_path()
if path:
with open(path[0], 'rb') as f:
data = bytearray(f.read())
return data
2) OS Command Injection
- Vulnerable Code Block
def insert_from_yaml(self, client_yaml):client = yaml.safe_load(os.popen('cat %s' % client_yaml))if type(client) is not dict:
e = ValueError('File %s could not be loaded.' % client_yaml)
logger.error('%s' % e)
raise enumber = client.get('number')
if number:
number = self.hasher.encode(number, self.hasher.salt())try:
conn = Connect()
cursor = conn.db.cursor()
cursor.execute(
"""
INSERT INTO client (username, password,
firstname, lastname, number, email_val, description_val)
VALUES (%(username)s, %(password)s, %(firstname)s,
%(lastname)s, %(number)s, %(email_val)s,
%(description_val)s)
""",
{
'username': client.get('username'),
'password': self.hasher.encode(
client.get('password'), self.hasher.salt()),
'firstname': client.get('firstname'),
'lastname': client.get('lastname'),
'number': number,
'email_val': client.get('email_val'),
'description_val': client.get('description_val')
}
)
conn.db.commit()
logger.info('Client %02d inserted' % cursor.lastrowid)
except MySQLError as e:
logger.error('Could not insert client: %s' % e)
finally:
conn.db.close()def delete(self, client_id):
try:
conn = Connect()
result = conn.db.cursor().execute(
"""
DELETE FROM client WHERE id=%(id)s
""", {'id': client_id}
)
conn.db.commit()
if result:
logger.warning('Client id %02d deleted' % client_id)
else:
logger.warning('Client does not exist')
except MySQLError as e:
logger.error('Could not delete client: %s' % e)
finally:
conn.db.close()
- Impact
Using an unsanitized input value as a parameter in an OS command call might allow an attacker to inject malicious OS commands to carry out unauthorized operations in the server running the application.
- Fix
It is recommended to avoid using unsanitized input parameters in OS command calls. Additionally, it is safer to use language libraries to perform the operation instead of invoking system-level commands. The Python application is reading the file using the open
method instead of calling the OS command cat
. In this way, as there is no OS command invocation, the injection risk is mitigated.
def insert_from_yaml(self, client_yaml):client = yaml.safe_load(os.popen('cat %s' % client_yaml))
with open(client_yaml, 'r') as stream:
client = yaml.safe_load(stream)if type(client) is not dict:
e = ValueError('File %s could not be loaded.' % client_yaml)
logger.error('%s' % e)
raise enumber = client.get('number')
if number:
number = self.hasher.encode(number, self.hasher.salt())try:
conn = Connect()
cursor = conn.db.cursor()
cursor.execute(
"""
INSERT INTO client (username, password,
firstname, lastname, number, email_val, description_val)
VALUES (%(username)s, %(password)s, %(firstname)s,
%(lastname)s, %(number)s, %(email_val)s,
%(description_val)s)
""",
{
'username': client.get('username'),
'password': self.hasher.encode(
client.get('password'), self.hasher.salt()),
'firstname': client.get('firstname'),
'lastname': client.get('lastname'),
'number': number,
'email_val': client.get('email_val'),
'description_val': client.get('description_val')
}
)
conn.db.commit()
logger.info('Client %02d inserted' % cursor.lastrowid)
except MySQLError as e:
logger.error('Could not insert client: %s' % e)
finally:
conn.db.close()def delete(self, client_id):
try:
conn = Connect()
result = conn.db.cursor().execute(
"""
DELETE FROM client WHERE id=%(id)s
""", {'id': client_id}
)
conn.db.commit()
if result:
logger.warning('Client id %02d deleted' % client_id)
else:
logger.warning('Client does not exist')
except MySQLError as e:
logger.error('Could not delete client: %s' % e)
finally:
conn.db.close()
OS Command Injection
- Vulnerable Code Block
class ProviderJSON(Provider):
"""Provider JSON operations"""
def insert_from_url(self, json_url):
o = urlparse(json_url)
# JSON URL validation
if not (o.scheme == 'https' and o.hostname in ALLOWED_HOSTS):
logger.error('JSON URL host is not allowed')
return
providers = []
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
json_file = os.path.join(BASE_DIR, '_provider.json')
# GET providers list from JSON URLtry:
os.system('wget -O %s %s' % (json_file, json_url))
with open(json_file) as f:
providers = json.load(f)
os.remove(json_file)
except Exception as e:
logger.error('Could not GET the JSON URL - %s' % e)
raise e
# Insert Providers JSON list
self.insert(providers)
- Impact
Executing OS commands to download files using os.system makes the application vulnerable to OS command injection attacks. An adversary could replace the json_url value with a malicious expression (e.g. URL; rm -fr /) to be executed in the production server.
- Fix
In order to mitigate, OS command injection attacks the program must not execute inputs as OS commands from untrusted sources to read files.
class ProviderJSON(Provider):
"""Provider JSON operations"""
def insert_from_url(self, json_url):
o = urlparse(json_url)
# JSON URL validation
if not (o.scheme == 'https' and o.hostname in ALLOWED_HOSTS):
logger.error('JSON URL host is not allowed')
return
providers = []
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
json_file = os.path.join(BASE_DIR, '_provider.json')
# GET providers list from JSON URLtry:
r = requests.get(o.geturl())
providers = r.json()
except Exception as e:
logger.error('Could not GET the JSON URL - %s' % e)
raise e
# Insert Providers JSON list
self.insert(providers)