spark hive job - renaming S3 dirs is very slow

Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

spark hive job - renaming S3 dirs is very slow

Post by Сабина »

Запустила spark job для экспорта данных redshift -> hive . 100 mln records заняло аж 15 часов. :bad:
Копнула логи, а там все выполнено за 15 минут, а остальное время идет s3 directories renaming ( way how hive works from what I understand)
19362: INFO [2017-11-20 20:08:31,998] ({pool-2-thread-27} Hive.java[moveFile]:2642) - Renaming src: s3://..../.hive-staging_hive_2017-11-20_19-48-35_181_1682270180271573182-3/..../part-00005-db748631-e223-4d02-a548-5d6b6890ea3c.c000, dest: s3://...../part-00005-db748631-e223-4d02-a548-5d6b6890ea3c.c000, Status:true
Непонятно почему это все выполняется одним тредом все это время :shock: ?
Так задумано или что не подкручено в EMR кластере/Hive/Spark .... ?
https://www.youtube.com/watch?v=wOwblaKmyVw
iDesperado
Уже с Приветом
Posts: 1349
Joined: 28 Nov 2008 17:50

Re: spark hive job - renaming S3 dirs is very slow

Post by iDesperado »

думаю начать стоит с выяснения, что такое хайв ? что за енжин под хайвом ? там parquret файлики, hbase, что-то другое под низом ?
мы обычно пишем в parquet файлики, а потом командой CREATE TABLE .. STORED AS PARQUET LOCATION уже в хайф метастор что либо прописываем.
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

iDesperado wrote: 21 Nov 2017 20:33 думаю начать стоит с выяснения, что такое хайв ? что за енжин под хайвом ? там parquret файлики, hbase, что-то другое под низом ?
мы обычно пишем в parquet файлики, а потом командой CREATE TABLE .. STORED AS PARQUET LOCATION уже в хайф метастор что либо прописываем.
О! Я как раз так делала сначала, записывала из redshifт сразу в parquet но меня развернули
Там export из redshift делается делается в temp dir в tsv first, parquet тока уже когда попадате в hive.
PS. Нам надо в Hive external table
https://www.youtube.com/watch?v=wOwblaKmyVw
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

OK, apparently there was some S3 issue which seem to go away now :o
Last edited by Сабина on 22 Nov 2017 00:44, edited 1 time in total.
https://www.youtube.com/watch?v=wOwblaKmyVw
iDesperado
Уже с Приветом
Posts: 1349
Joined: 28 Nov 2008 17:50

Re: spark hive job - renaming S3 dirs is very slow

Post by iDesperado »

а чем-то мотивировали двойную писанину? кстати, все таки интересно, что у EMR за енжин под хайвом ? MR или что посовременней ?
_reality
Уже с Приветом
Posts: 232
Joined: 18 Nov 2014 22:55
Location: SFBA

Re: spark hive job - renaming S3 dirs is very slow

Post by _reality »

Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

iDesperado wrote: 21 Nov 2017 21:40 а чем-то мотивировали двойную писанину? кстати, все таки интересно, что у EMR за енжин под хайвом ? MR или что посовременней ?
spark-redshift драйвер так работает что если ты из редшифтовой таблицы делаешь dataframe, то он создает tempDir for unload, а нашем случае в S3 и по умолчанию tsv


обчественность грузит примерно так и меня заставили это использовать и запускать from notebook

Code: Select all

def loadDataFromRedshift(redshift_table: String,
                         select_query: String,
                         target_hive_table: String
                         ):Unit = {
    // Instance Profile for authentication to AWS resources
    val provider = new InstanceProfileCredentialsProvider()
    val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]
    val token = credentials.getSessionToken
    val awsAccessKey = credentials.getAWSAccessKeyId
    val awsSecretKey = credentials.getAWSSecretKey
    
    //Provide the jdbc url for Amazon Redshift
    val jdbcUsername = "..."
    val jdbcPassword = "..."
    val jdbcHostname = "..."
    val jdbcPort = 5439
    val jdbcDatabase = "....."
    val jdbcUrl = s"jdbc:redshift://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}"
    
    // Create and declare an S3 bucket where the temporary files are written
    val protocol  = "s3n"
    val s3Bucket  = "..."
    val s3TempDir = s"$protocol://$s3Bucket/..../$redshift_table"
    
    val df = spark.read
      .format("com.databricks.spark.redshift")
      .option("url", jdbcUrl)
          .option("tempdir", s3TempDir)
      .option("query", select_query)
      .option("temporary_aws_access_key_id", awsAccessKey)
      .option("temporary_aws_secret_access_key", awsSecretKey)
      .option("temporary_aws_session_token", token)
      .load()
    
    val sourceDF = df.drop("auto_id")
    val tmpTable: String = createOrReplaceTempView(target_hive_table, sourceDF)

    val partitionColumns = List("collection_date")
    val outputColumns = createSelectColumnsStatement(df.columns, spark.table(target_hive_table).columns, partitionColumns).mkString(", ")

    val sqlStatement =
      if (partitionColumns.isEmpty) {
        s"""
           |INSERT INTO TABLE $target_hive_table
           |SELECT $outputColumns
           |FROM $tmpTable
    """.stripMargin
      } else {
        val partitionFieldsStr = partitionColumns.mkString(", ")
        s"""
           |INSERT INTO TABLE $target_hive_table PARTITION($partitionFieldsStr)
           |SELECT $outputColumns, to_date(`collection_timestamp`) AS `collection_dt`
           |FROM $tmpTable
    """.stripMargin
      }
    
    println(s"SQL Insert statement: $sqlStatement")
    
    spark.sql(sqlStatement)
} 
Сама я до этого запускала from laptop

Code: Select all

def importRedshiftResultSetToHive(resultSetQuery: String, hiveTable: String, s3TempDir: String) {

    val redshiftDf = spark.read
      .format("com.databricks.spark.redshift")
      .option("url", jdbcURL)
      .option("query", resultSetQuery)
      .option("tempdir", s3TempDir)
      .option("forward_spark_s3_credentials", true)
      .load()

    //redshiftDf.show()

    import spark.implicits._

    val completeDf = redshiftDf
      .drop("auto_id")
      .withColumn("collection_dt", to_date($"collection_timestamp", "yyyy-MM-dd"))
      .withColumn("collection_timestamp", to_unix_timestamp($"collection_timestamp"))
      .withColumn("event_timestamp", to_unix_timestamp($"event_timestamp"))
      .withColumn("etl_timestamp", to_unix_timestamp($"etl_timestamp"))

    //completeDf.show()
    completeDf.write.format("parquet").insertInto(hiveTable)
  }
Но оно так же работает rename etc
Last edited by Сабина on 22 Nov 2017 00:55, edited 4 times in total.
https://www.youtube.com/watch?v=wOwblaKmyVw
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

Интересно :great:
Наш девопсовый перец клянется что все что он поменял - это tez.runtime.pipeline.sorter.lazy-allocate.memory на true и потом перестартанул все сервисы оно "залетало". У нас Tez= hive engine
Я не поняла каким местом этот параметр мог помочь, наверное иррелевант. Скорее там какая то сопля была и перезапуск всех сервисов помог.
У меня последний день тут, так уж умничаю напоследок :oops: , ну и вообще люблю разбираться с performance до конца. Но тут не получилось - трудно без пермишенсов да еще и когда кто-то посторонний меняет и перестартует фиг знает что
https://www.youtube.com/watch?v=wOwblaKmyVw
User avatar
metaller
Уже с Приветом
Posts: 4532
Joined: 06 Jul 2011 12:22
Location: Oak Harbor, WA

Re: spark hive job - renaming S3 dirs is very slow

Post by metaller »

Сабина wrote: 22 Nov 2017 00:41
Интересно :great:
Наш девопсовый перец клянется что все что он поменял - это tez.runtime.pipeline.sorter.lazy-allocate.memory на true и потом перестартанул все сервисы оно "залетало". У нас Tez= hive engine
Я не поняла каким местом этот параметр мог помочь, наверное иррелевант. Скорее там какая то сопля была и перезапуск всех сервисов помог.
У меня последний день тут, так уж умничаю напоследок :oops: , ну и вообще люблю разбираться с performance до конца. Но тут не получилось - трудно без пермишенсов да еще и когда кто-то посторонний меняет и перестартует фиг знает что
Попробуйте без TEZ, может помочь. За мою недолгую (1.5 года) карьеру Big Data developer слово TEZ стало синонимом слова ГЛЮК ;)
Я не хочу оплачивать ничьи аборты, велферы или детсады. Каждый должен сам принимать решения и нести за них ответственность.
iDesperado
Уже с Приветом
Posts: 1349
Joined: 28 Nov 2008 17:50

Re: spark hive job - renaming S3 dirs is very slow

Post by iDesperado »

с налету не нашел описания как работает completeDf.write.format("parquet").insertInto(hiveTable)
вдруг оно SQL в хайв отправляет, а не то что мы подумали ?

у меня другой вариант был: completeDf.write.parquet("folder") а потом уже на фолдер надевать хайв таблички
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

metaller wrote: 22 Nov 2017 02:43 Попробуйте без TEZ, может помочь. За мою недолгую (1.5 года) карьеру Big Data developer слово TEZ стало синонимом слова ГЛЮК ;)
Ой на больной мозоль наступаете :)! В смысле что Tez притащил тот самый девопсовый гай, который "лучше девелоперов знает что делать". Ну и сами понимаете стартап, то есть с кем работать тим не выбирает - кого дали, то бишь.

Меня еще одна вещь мучает ( точнее несколько :D , но теперь уже задним числом).
Там кластер был сконфигурирован так:
- MASTER node,
- CORE node
- TASK node ( автоскейлится от 1 до 10).

Смущали следующие вещи:
1) Zeppelin был сконфигурирован так, что все ,запущенное в разных ноутбуках (разными девелоперами, пользующимися одним общим логином) проходило как один application ID. Логи трудно разгребать, все в куче. По моему это не самый эффективный способ конфигурировать notebook for multiple developers
2) В DAG видно что для некоторых задач запускаются десятки экзекьюторов, но все они почему то на CORE node, то есть IP core node, только порты разные. А почему TASK node(s) вообще не используется :pain1: ? Это тоже не поняла ?

А почему ваша карьера big data engineer недолгая ? забросили , непонравилась ? или только начинаете ?
https://www.youtube.com/watch?v=wOwblaKmyVw
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

iDesperado wrote: 22 Nov 2017 14:39 с налету не нашел описания как работает completeDf.write.format("parquet").insertInto(hiveTable)
вдруг оно SQL в хайв отправляет, а не то что мы подумали ?

у меня другой вариант был: completeDf.write.parquet("folder") а потом уже на фолдер надевать хайв таблички
как это sql отправляет ? Отправляло что надо насколько я видела :).
в вашем случае как я поняла вы писали не в external table ... , в нашем мы сначала создавали CREATE AS EXTERNAL TABLe с нужными партициями, а потом в них экспортировали данные из редшифта
https://www.youtube.com/watch?v=wOwblaKmyVw
User avatar
metaller
Уже с Приветом
Posts: 4532
Joined: 06 Jul 2011 12:22
Location: Oak Harbor, WA

Re: spark hive job - renaming S3 dirs is very slow

Post by metaller »

Сабина wrote: 23 Nov 2017 13:41
metaller wrote: 22 Nov 2017 02:43 Попробуйте без TEZ, может помочь. За мою недолгую (1.5 года) карьеру Big Data developer слово TEZ стало синонимом слова ГЛЮК ;)
Ой на больной мозоль наступаете :)! В смысле что Tez притащил тот самый девопсовый гай, который "лучше девелоперов знает что делать". Ну и сами понимаете стартап, то есть с кем работать тим не выбирает - кого дали, то бишь.

Меня еще одна вещь мучает ( точнее несколько :D , но теперь уже задним числом).
Там кластер был сконфигурирован так:
- MASTER node,
- CORE node
- TASK node ( автоскейлится от 1 до 10).

Смущали следующие вещи:
1) Zeppelin был сконфигурирован так, что все ,запущенное в разных ноутбуках (разными девелоперами, пользующимися одним общим логином) проходило как один application ID. Логи трудно разгребать, все в куче. По моему это не самый эффективный способ конфигурировать notebook for multiple developers
2) В DAG видно что для некоторых задач запускаются десятки экзекьюторов, но все они почему то на CORE node, то есть IP core node, только порты разные. А почему TASK node(s) вообще не используется :pain1: ? Это тоже не поняла ?

А почему ваша карьера big data engineer недолгая ? забросили , непонравилась ? или только начинаете ?
Потому что к тому времени получил GC и решил сменить род деятельности на не-IT. 20 лет по клаве стучал, хватит уже ;)
Я не хочу оплачивать ничьи аборты, велферы или детсады. Каждый должен сам принимать решения и нести за них ответственность.
Сабина
Уже с Приветом
Posts: 19041
Joined: 11 Jan 2012 09:25
Location: CA

Re: spark hive job - renaming S3 dirs is very slow

Post by Сабина »

metaller wrote: 24 Nov 2017 02:56 Потому что к тому времени получил GC и решил сменить род деятельности на не-IT. 20 лет по клаве стучал, хватит уже ;)
А! Ну да. Память короткая ;)
https://www.youtube.com/watch?v=wOwblaKmyVw

Return to “Вопросы и новости IT”