|
@@ -0,0 +1,204 @@
|
|
|
|
+import hashlib
|
|
|
|
+import struct
|
|
|
|
+from collections import defaultdict
|
|
|
|
+
|
|
|
|
+from telecaster.Clients.PrestaShopClient import PrestaShopClient
|
|
|
|
+
|
|
|
|
+CANONICAL_DOMAIN = "https://cosmohome.gr"
|
|
|
|
+TOKEN = "IK2GQGGKH4LUVYR9C2TSF6XC51F1J1C3"
|
|
|
|
+
|
|
|
|
+COLOR_PRODUCT_OPTION_IDS = ['5', '17', '11']
|
|
|
|
+SIZE_PRODUCT_OPTION_IDS = ['1', '13', '16', '3']
|
|
|
|
+
|
|
|
|
+TAX = 0.24
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def numHash(s, length=None):
|
|
|
|
+ hash_object = hashlib.md5(s.encode('ISO-8859-1')).digest()
|
|
|
|
+ hash_str, _ = struct.unpack('>II', hash_object[:8])
|
|
|
|
+ if length and isinstance(length, int):
|
|
|
|
+ hash_str = str(hash_str)[:length]
|
|
|
|
+ return str(hash_str)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_nested(data, map_list):
|
|
|
|
+ for k in map_list:
|
|
|
|
+ data = data.get(k, None)
|
|
|
|
+ if data is None:
|
|
|
|
+ return None
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class CosmohomeClient(PrestaShopClient):
|
|
|
|
+ def __init__(self, base_url=None, token=None):
|
|
|
|
+ super().__init__(base_url or CANONICAL_DOMAIN, token or TOKEN)
|
|
|
|
+ self.caches = defaultdict(dict)
|
|
|
|
+
|
|
|
|
+ def update_cache(self, cache_key, values: list, id_key_path):
|
|
|
|
+ cache_dict = self.caches[cache_key]
|
|
|
|
+ for item in values:
|
|
|
|
+ key = get_nested(item, id_key_path)
|
|
|
|
+ cache_dict[key] = item
|
|
|
|
+
|
|
|
|
+ def load_cache(self):
|
|
|
|
+ categories = self.get_categories() # {'display': '[id,name, link_rewrite, id_parent]'})
|
|
|
|
+ features = self.get_features()
|
|
|
|
+ feature_values = self.get_feature_values()
|
|
|
|
+ product_options = self.get_product_options()
|
|
|
|
+ product_option_values = self.get_product_option_values()
|
|
|
|
+ # tax_rule_groups = self.get_tax_rule_groups()
|
|
|
|
+ # tax_rules = self.get_tax_rules()
|
|
|
|
+ # taxes = self.get_taxes()
|
|
|
|
+
|
|
|
|
+ categories = map(lambda x: x['category'], categories)
|
|
|
|
+ features = map(lambda x: x['product_feature'], features)
|
|
|
|
+ feature_values = map(lambda x: x['product_feature_value'], feature_values)
|
|
|
|
+ product_options = map(lambda x: x['product_option'], product_options)
|
|
|
|
+ product_option_values = map(lambda x: x['product_option_value'],
|
|
|
|
+ product_option_values)
|
|
|
|
+ # tax_rule_groups = map(lambda x: x['tax_rule_group'], tax_rule_groups)
|
|
|
|
+ # tax_rules = map(lambda x: x['tax_rule'], tax_rules)
|
|
|
|
+ # taxes = map(lambda x: x['tax'], taxes)
|
|
|
|
+
|
|
|
|
+ self.update_cache('categories', categories, ['id'])
|
|
|
|
+ self.update_cache('features', features, ['id'])
|
|
|
|
+ self.update_cache('feature_values', feature_values, ['id'])
|
|
|
|
+ self.update_cache('product_options', product_options, ['id'])
|
|
|
|
+ self.update_cache('product_option_values', product_option_values, ['id'])
|
|
|
|
+ # self.update_cache('tax_rule_groups', tax_rule_groups, ['id'])
|
|
|
|
+ # self.update_cache('tax_rules', tax_rules, ['id'])
|
|
|
|
+ # self.update_cache('taxes', taxes, ['id'])
|
|
|
|
+
|
|
|
|
+ def parse_product(self, product, combinations):
|
|
|
|
+ product = product.get('product')
|
|
|
|
+ product_combinations = self.get_product_combinations(product, combinations)
|
|
|
|
+
|
|
|
|
+ # Move this to pre-computed cache when perform batches
|
|
|
|
+ id_product_attributes = [c.get('id') for c in product_combinations]
|
|
|
|
+ sp = self.get_specific_prices(
|
|
|
|
+ {'filter[id_product_attribute]': f"{'|'.join(id_product_attributes)}"})
|
|
|
|
+ sp = list(map(lambda x: x['specific_price'], sp))
|
|
|
|
+ self.update_cache('specific_prices', sp, ['id_product_attribute'])
|
|
|
|
+ self.update_cache('specific_prices', sp, ['id_product'])
|
|
|
|
+ ## END OF Specific prices calculations
|
|
|
|
+
|
|
|
|
+ serialized_product = {
|
|
|
|
+ "productId": product.get('id'),
|
|
|
|
+ "title": self.sanitize_language(product.get('name')),
|
|
|
|
+ "productURL": product.get('link_rewrite'),
|
|
|
|
+ # add to the url the attributes like #/attr_id/attr_id/...
|
|
|
|
+ "Category": self.get_category_tree(product.get('id_category_default'),
|
|
|
|
+ min_depth=1),
|
|
|
|
+ "Category_ID": product.get('id_category_default'),
|
|
|
|
+ "weight": product.get('weight'),
|
|
|
|
+ "Manufacturer": product.get('manufacturer_name'),
|
|
|
|
+ "MPN": product.get('reference') or product.get('mpn'),
|
|
|
|
+ "Barcode": product.get('ean13'),
|
|
|
|
+ "Price": self.calculate_final_price(product),
|
|
|
|
+ "image": self.generate_image_url(product),
|
|
|
|
+ "stock": "Y" if int(product.get('quantity')) > 0 else "N",
|
|
|
|
+ "Availability": "Available from 1 to 3 days" if int(
|
|
|
|
+ product.get('quantity')) > 0 else "",
|
|
|
|
+ "Quantity": product.get('quantity'),
|
|
|
|
+ "Color": self.get_color(product),
|
|
|
|
+ "Size": self.get_size(product),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ serialized_combinations = []
|
|
|
|
+ for combination in product_combinations:
|
|
|
|
+ serialized_combinations.append({
|
|
|
|
+ **serialized_product,
|
|
|
|
+ "productId": product.get('id') + numHash(combination.get('reference'), 10),
|
|
|
|
+ "Barcode": combination.get('ean13') or serialized_product.get(
|
|
|
|
+ 'Barcode'),
|
|
|
|
+ "Name": combination.get('name'),
|
|
|
|
+ "weight": product.get('weight'),
|
|
|
|
+ "MPN": combination.get('reference') or combination.get(
|
|
|
|
+ 'mpn') or serialized_product.get('MPN'),
|
|
|
|
+
|
|
|
|
+ "Price": self.calculate_final_price(
|
|
|
|
+ combination,
|
|
|
|
+ float(product.get('price')) + float(combination.get('price'))
|
|
|
|
+ ),
|
|
|
|
+ "Quantity": combination.get('quantity') or serialized_product.get(
|
|
|
|
+ 'Quantity'),
|
|
|
|
+
|
|
|
|
+ "Color": self.get_color(product) or serialized_product.get('Color'),
|
|
|
|
+ "Size": self.get_size(product) or serialized_product.get('Size'),
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return [serialized_product] if not combinations else serialized_combinations
|
|
|
|
+
|
|
|
|
+ def sanitize_language(self, value):
|
|
|
|
+ if isinstance(value, str):
|
|
|
|
+ return value.strip()
|
|
|
|
+ elif isinstance(value, dict):
|
|
|
|
+ return list(value.values())[0]
|
|
|
|
+ else:
|
|
|
|
+ return value
|
|
|
|
+
|
|
|
|
+ def get_category_tree(self, category_id, min_depth=0):
|
|
|
|
+ categories = self.caches['categories']
|
|
|
|
+ category = categories[category_id]
|
|
|
|
+ category_tree = []
|
|
|
|
+ while category and int(category.get('level_depth')) > min_depth:
|
|
|
|
+ if category:
|
|
|
|
+ category_tree.insert(0, category['name'])
|
|
|
|
+ category = categories.get(category.get('id_parent'))
|
|
|
|
+ else:
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ return [self.sanitize_language(c) for c in category_tree]
|
|
|
|
+
|
|
|
|
+ def generate_image_url(self, product):
|
|
|
|
+ image_id = product.get('id_default_image')
|
|
|
|
+ link_rewrite = self.sanitize_language(product.get('link_rewrite'))
|
|
|
|
+ return f"{CANONICAL_DOMAIN}/{image_id}-thickbox_default/{link_rewrite}.jpg"
|
|
|
|
+
|
|
|
|
+ def get_color(self, product):
|
|
|
|
+ options = product.get('associations').get('product_option_values')
|
|
|
|
+ for option in options:
|
|
|
|
+ option = option.get('product_option_value')
|
|
|
|
+ option_value = self.caches['product_option_values'].get(option.get('id'))
|
|
|
|
+ if option_value and option_value.get(
|
|
|
|
+ 'id_attribute_group') in COLOR_PRODUCT_OPTION_IDS:
|
|
|
|
+ return self.sanitize_language(option_value.get('name'))
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ def get_size(self, product):
|
|
|
|
+ options = product.get('associations').get('product_option_values')
|
|
|
|
+ for option in options:
|
|
|
|
+ option = option.get('product_option_value')
|
|
|
|
+ option_value = self.caches['product_option_values'].get(option.get('id'))
|
|
|
|
+ if option_value and option_value.get(
|
|
|
|
+ 'id_attribute_group') in SIZE_PRODUCT_OPTION_IDS:
|
|
|
|
+ return self.sanitize_language(option_value.get('name'))
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ def get_product_combinations(self, product, combinations):
|
|
|
|
+ if combinations:
|
|
|
|
+ combinations = map(lambda x: x['combination'], combinations)
|
|
|
|
+ self.update_cache('combinations', combinations, ['id'])
|
|
|
|
+
|
|
|
|
+ combinations = self.caches['combinations']
|
|
|
|
+
|
|
|
|
+ product_combinations = product.get('associations').get('combinations')
|
|
|
|
+ product_combination_ids = map(lambda x: x['combination']['id'],
|
|
|
|
+ product_combinations)
|
|
|
|
+
|
|
|
|
+ return [combinations.get(cid) for cid in product_combination_ids]
|
|
|
|
+
|
|
|
|
+ def calculate_final_price(self, product, price=None):
|
|
|
|
+ if not price:
|
|
|
|
+ price = product.get('price')
|
|
|
|
+
|
|
|
|
+ price_with_tax = float(price) * (1 + TAX)
|
|
|
|
+
|
|
|
|
+ sp = self.caches['specific_prices'].get(product.get('id'))
|
|
|
|
+ discount = sp.get('reduction')
|
|
|
|
+ type = sp.get('reduction_type')
|
|
|
|
+ if type == 'percentage':
|
|
|
|
+ final_price_with_tax = price_with_tax * (1 - float(discount))
|
|
|
|
+ else:
|
|
|
|
+ final_price_with_tax = price_with_tax - float(discount)
|
|
|
|
+ return round(final_price_with_tax, 2)
|