Secure Code Review -1 | Cheat sheet For Security Vulnerability In Python — Injection Flaws

Divyanshu
InfoSec Write-ups
Published in
4 min readMay 25, 2022

--

Based on OWASP Top-10 Vulnerabilities. This time we are looking for secure coding bugs related to Injection Flaws

1) Path Traversal Attack

  • Vulnerable Code Block
def get_video(self, path=None):
self.check_user_auth()
data = None
if not path:
path = self.get_video_path()
path = path[0] if path else None
if path:
with open(path, 'rb') as f:
data = bytearray(f.read())
return data
  • Impact

Allowing users to pass the video path as a parameter makes it vulnerable to path traversal injection. An adversary could get access to any file on the server manipulating the path (get_video('/etc/passwd')).

  • Fix

It’s recommended to avoid using untrusted input to build a file’s path. The application is building the path parameter on server-side instead of trusting an input value. In this way, an attacker will not be able to carry out a Path Traversal attack by manipulating input parameters.

def get_video(self):
self.check_user_auth()
data = None
path = self.get_video_path()
if path:
with open(path[0], 'rb') as f:
data = bytearray(f.read())
return data

2) OS Command Injection

  • Vulnerable Code Block
def insert_from_yaml(self, client_yaml):client = yaml.safe_load(os.popen('cat %s' % client_yaml))if type(client) is not dict:
e = ValueError('File %s could not be loaded.' % client_yaml)
logger.error('%s' % e)
raise e
number = client.get('number')
if number:
number = self.hasher.encode(number, self.hasher.salt())
try:
conn = Connect()
cursor = conn.db.cursor()
cursor.execute(
"""
INSERT INTO client (username, password,
firstname, lastname, number, email_val, description_val)
VALUES (%(username)s, %(password)s, %(firstname)s,
%(lastname)s, %(number)s, %(email_val)s,
%(description_val)s)
""",
{
'username': client.get('username'),
'password': self.hasher.encode(
client.get('password'), self.hasher.salt()),
'firstname': client.get('firstname'),
'lastname': client.get('lastname'),
'number': number,
'email_val': client.get('email_val'),
'description_val': client.get('description_val')
}
)
conn.db.commit()
logger.info('Client %02d inserted' % cursor.lastrowid)
except MySQLError as e:
logger.error('Could not insert client: %s' % e)
finally:
conn.db.close()
def delete(self, client_id):
try:
conn = Connect()
result = conn.db.cursor().execute(
"""
DELETE FROM client WHERE id=%(id)s
""", {'id': client_id}
)
conn.db.commit()
if result:
logger.warning('Client id %02d deleted' % client_id)
else:
logger.warning('Client does not exist')
except MySQLError as e:
logger.error('Could not delete client: %s' % e)
finally:
conn.db.close()
  • Impact

Using an unsanitized input value as a parameter in an OS command call might allow an attacker to inject malicious OS commands to carry out unauthorized operations in the server running the application.

  • Fix

It is recommended to avoid using unsanitized input parameters in OS command calls. Additionally, it is safer to use language libraries to perform the operation instead of invoking system-level commands. The Python application is reading the file using the open method instead of calling the OS command cat. In this way, as there is no OS command invocation, the injection risk is mitigated.

def insert_from_yaml(self, client_yaml):client = yaml.safe_load(os.popen('cat %s' % client_yaml))
with open(client_yaml, 'r') as stream:
client = yaml.safe_load(stream)
if type(client) is not dict:
e = ValueError('File %s could not be loaded.' % client_yaml)
logger.error('%s' % e)
raise e
number = client.get('number')
if number:
number = self.hasher.encode(number, self.hasher.salt())
try:
conn = Connect()
cursor = conn.db.cursor()
cursor.execute(
"""
INSERT INTO client (username, password,
firstname, lastname, number, email_val, description_val)
VALUES (%(username)s, %(password)s, %(firstname)s,
%(lastname)s, %(number)s, %(email_val)s,
%(description_val)s)
""",
{
'username': client.get('username'),
'password': self.hasher.encode(
client.get('password'), self.hasher.salt()),
'firstname': client.get('firstname'),
'lastname': client.get('lastname'),
'number': number,
'email_val': client.get('email_val'),
'description_val': client.get('description_val')
}
)
conn.db.commit()
logger.info('Client %02d inserted' % cursor.lastrowid)
except MySQLError as e:
logger.error('Could not insert client: %s' % e)
finally:
conn.db.close()
def delete(self, client_id):
try:
conn = Connect()
result = conn.db.cursor().execute(
"""
DELETE FROM client WHERE id=%(id)s
""", {'id': client_id}
)
conn.db.commit()
if result:
logger.warning('Client id %02d deleted' % client_id)
else:
logger.warning('Client does not exist')
except MySQLError as e:
logger.error('Could not delete client: %s' % e)
finally:
conn.db.close()

OS Command Injection

  • Vulnerable Code Block
class ProviderJSON(Provider):
"""Provider JSON operations"""
def insert_from_url(self, json_url):
o = urlparse(json_url)
# JSON URL validation
if not (o.scheme == 'https' and o.hostname in ALLOWED_HOSTS):
logger.error('JSON URL host is not allowed')
return
providers = []
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
json_file = os.path.join(BASE_DIR, '_provider.json')
# GET providers list from JSON URL
try:
os.system('wget -O %s %s' % (json_file, json_url))
with open(json_file) as f:
providers = json.load(f)
os.remove(json_file)
except Exception as e:
logger.error('Could not GET the JSON URL - %s' % e)
raise e
# Insert Providers JSON list
self.insert(providers)
  • Impact

Executing OS commands to download files using os.system makes the application vulnerable to OS command injection attacks. An adversary could replace the json_url value with a malicious expression (e.g. URL; rm -fr /) to be executed in the production server.

  • Fix

In order to mitigate, OS command injection attacks the program must not execute inputs as OS commands from untrusted sources to read files.

class ProviderJSON(Provider):
"""Provider JSON operations"""
def insert_from_url(self, json_url):
o = urlparse(json_url)
# JSON URL validation
if not (o.scheme == 'https' and o.hostname in ALLOWED_HOSTS):
logger.error('JSON URL host is not allowed')
return
providers = []
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
json_file = os.path.join(BASE_DIR, '_provider.json')
# GET providers list from JSON URL
try:
r = requests.get(o.geturl())
providers = r.json()
except Exception as e:
logger.error('Could not GET the JSON URL - %s' % e)
raise e
# Insert Providers JSON list
self.insert(providers)

Reference:

--

--